diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 1eed59e16..0320f2c36 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -36,12 +36,14 @@ func main() { log.SetFlags(log.Lmicroseconds) endpointStr := flag.String("endpoints", "localhost:2379", "endpoints of etcd cluster") - mode := flag.String("mode", "lock-racer", "test mode (lock-racer)") + mode := flag.String("mode", "lock-racer", "test mode (election, lock-racer, lease-renewer)") round := flag.Int("rounds", 100, "number of rounds to run") flag.Parse() eps := strings.Split(*endpointStr, ",") switch *mode { + case "election": + runElection(eps, *round) case "lock-racer": runRacer(eps, *round) case "lease-renewer": @@ -51,6 +53,78 @@ func main() { } } +func runElection(eps []string, rounds int) { + rcs := make([]roundClient, 15) + validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs)) + for range rcs { + releasec <- struct{}{} + } + + for i := range rcs { + v := fmt.Sprintf("%d", i) + observedLeader := "" + validateWaiters := 0 + + rcs[i].c = randClient(eps) + e := concurrency.NewElection(rcs[i].c, "electors") + + rcs[i].acquire = func() error { + <-releasec + ctx, cancel := context.WithCancel(context.Background()) + go func() { + if ol, ok := <-e.Observe(ctx); ok { + observedLeader = string(ol.Kvs[0].Value) + if observedLeader != v { + cancel() + } + } + }() + err := e.Campaign(ctx, v) + if err == nil { + observedLeader = v + } + if observedLeader == v { + validateWaiters = len(rcs) + } + select { + case <-ctx.Done(): + return nil + default: + cancel() + return err + } + } + rcs[i].validate = func() error { + if l, err := e.Leader(); err == nil && l != observedLeader { + return fmt.Errorf("expected leader %q, got %q", observedLeader, l) + } + validatec <- struct{}{} + return nil + } + rcs[i].release = func() error { + for validateWaiters > 0 { + select { + case <-validatec: + validateWaiters-- + default: + return fmt.Errorf("waiting on followers") + } + } + if err := e.Resign(); err != nil { + return err + } + if observedLeader == v { + for range rcs { + releasec <- struct{}{} + } + } + observedLeader = "" + return nil + } + } + doRounds(rcs, rounds) +} + func runLeaseRenewer(eps []string) { c := randClient(eps) ctx := context.Background() @@ -94,83 +168,28 @@ func runLeaseRenewer(eps []string) { } func runRacer(eps []string, round int) { - nrace := 15 - prefix := "racers" - racers := make([]*concurrency.Mutex, nrace) - clis := make([]*clientv3.Client, nrace) - progress := make([]int, nrace) - finished := make(chan struct{}, 0) - - var ( - mu sync.Mutex - cnt int - ) + rcs := make([]roundClient, 15) ctx := context.Background() - - var wg sync.WaitGroup - - for i := range racers { - clis[i] = randClient(eps) - racers[i] = concurrency.NewMutex(clis[i], prefix) - wg.Add(1) - - go func(i int) { - defer wg.Done() - - for { - if progress[i] >= round { - return - } - - for { - err := racers[i].Lock(ctx) - if err == nil { - break - } - } - - mu.Lock() - if cnt > 0 { - log.Fatalf("bad lock") - } - cnt = 1 - mu.Unlock() - - time.Sleep(10 * time.Millisecond) - progress[i]++ - finished <- struct{}{} - - mu.Lock() - for { - err := racers[i].Unlock() - if err == nil { - break - } - } - cnt = 0 - mu.Unlock() + cnt := 0 + for i := range rcs { + rcs[i].c = randClient(eps) + m := concurrency.NewMutex(rcs[i].c, "racers") + rcs[i].acquire = func() error { return m.Lock(ctx) } + rcs[i].validate = func() error { + if cnt++; cnt != 1 { + return fmt.Errorf("bad lock; count: %d", cnt) } - }(i) - } - - start := time.Now() - for i := 1; i < nrace*round+1; i++ { - select { - case <-finished: - if i%100 == 0 { - fmt.Printf("finished %d, took %v\n", i, time.Since(start)) - start = time.Now() + return nil + } + rcs[i].release = func() error { + if err := m.Unlock(); err != nil { + return err } - case <-time.After(time.Minute): - log.Panic("no progress after 1 minute!") + cnt = 0 + return nil } } - - wg.Wait() - - for _, cli := range clis { - cli.Close() - } + doRounds(rcs, round) } func randClient(eps []string) *clientv3.Client { @@ -191,3 +210,63 @@ func randClient(eps []string) *clientv3.Client { } return c } + +type roundClient struct { + c *clientv3.Client + progress int + acquire func() error + validate func() error + release func() error +} + +func doRounds(rcs []roundClient, rounds int) { + var mu sync.Mutex + var wg sync.WaitGroup + + wg.Add(len(rcs)) + finished := make(chan struct{}, 0) + for i := range rcs { + go func(rc *roundClient) { + defer wg.Done() + for rc.progress < rounds { + for rc.acquire() != nil { /* spin */ + } + + mu.Lock() + if err := rc.validate(); err != nil { + log.Fatal(err) + } + mu.Unlock() + + time.Sleep(10 * time.Millisecond) + rc.progress++ + finished <- struct{}{} + + mu.Lock() + for rc.release() != nil { + mu.Unlock() + mu.Lock() + } + mu.Unlock() + } + }(&rcs[i]) + } + + start := time.Now() + for i := 1; i < len(rcs)*rounds+1; i++ { + select { + case <-finished: + if i%100 == 0 { + fmt.Printf("finished %d, took %v\n", i, time.Since(start)) + start = time.Now() + } + case <-time.After(time.Minute): + log.Panic("no progress after 1 minute!") + } + } + + wg.Wait() + for _, rc := range rcs { + rc.c.Close() + } +}