diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 2699e2a13..bdb8d579e 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -36,12 +36,14 @@ func main() { log.SetFlags(log.Lmicroseconds) endpointStr := flag.String("endpoints", "localhost:2379", "endpoints of etcd cluster") - mode := flag.String("mode", "lock-racer", "test mode (lock-racer, lease-renewer)") + mode := flag.String("mode", "lock-racer", "test mode (election, lock-racer, lease-renewer)") round := flag.Int("rounds", 100, "number of rounds to run") flag.Parse() eps := strings.Split(*endpointStr, ",") switch *mode { + case "election": + runElection(eps, *round) case "lock-racer": runRacer(eps, *round) case "lease-renewer": @@ -51,6 +53,78 @@ func main() { } } +func runElection(eps []string, rounds int) { + rcs := make([]roundClient, 15) + validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs)) + for range rcs { + releasec <- struct{}{} + } + + for i := range rcs { + v := fmt.Sprintf("%d", i) + observedLeader := "" + validateWaiters := 0 + + rcs[i].c = randClient(eps) + e := concurrency.NewElection(rcs[i].c, "electors") + + rcs[i].acquire = func() error { + <-releasec + ctx, cancel := context.WithCancel(context.Background()) + go func() { + if ol, ok := <-e.Observe(ctx); ok { + observedLeader = string(ol.Kvs[0].Value) + if observedLeader != v { + cancel() + } + } + }() + err := e.Campaign(ctx, v) + if err == nil { + observedLeader = v + } + if observedLeader == v { + validateWaiters = len(rcs) + } + select { + case <-ctx.Done(): + return nil + default: + cancel() + return err + } + } + rcs[i].validate = func() error { + if l, err := e.Leader(); err == nil && l != observedLeader { + return fmt.Errorf("expected leader %q, got %q", observedLeader, l) + } + validatec <- struct{}{} + return nil + } + rcs[i].release = func() error { + for validateWaiters > 0 { + select { + case <-validatec: + validateWaiters-- + default: + return fmt.Errorf("waiting on followers") + } + } + if err := e.Resign(); err != nil { + return err + } + if observedLeader == v { + for range rcs { + releasec <- struct{}{} + } + } + observedLeader = "" + return nil + } + } + doRounds(rcs, rounds) +} + func runLeaseRenewer(eps []string) { c := randClient(eps) ctx := context.Background()