diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 11a87dd10..617009b9c 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -54,7 +54,7 @@ func main() { case "lease-renewer": runLeaseRenewer(eps) case "watcher": - runWatcher(eps) + runWatcher(eps, *round) default: fmt.Fprintf(os.Stderr, "unsupported mode %v\n", *mode) } @@ -219,7 +219,14 @@ func runRacer(eps []string, round int) { doRounds(rcs, round) } -func runWatcher(eps []string) { +func runWatcher(eps []string, limit int) { + ctx := context.Background() + for round := 0; round < limit; round++ { + performWatchOnPrefixes(ctx, eps, round) + } +} + +func performWatchOnPrefixes(ctx context.Context, eps []string, round int) { runningTime := 60 * time.Second // time for which operation should be performed noOfPrefixes := 36 // total number of prefixes which will be watched upon watchPerPrefix := 10 // number of watchers per prefix @@ -229,6 +236,8 @@ func runWatcher(eps []string) { prefixes := generateUniqueKeys(5, noOfPrefixes) keys := generateRandomKeys(10, keyPrePrefix) + roundPrefix := fmt.Sprint("%16x", round) + var ( revision int64 wg sync.WaitGroup @@ -236,7 +245,6 @@ func runWatcher(eps []string) { err error ) - ctx := context.Background() // create client for performing get and put operations client := randClient(eps) defer client.Close() @@ -253,9 +261,9 @@ func runWatcher(eps []string) { go func() { var modrevision int64 - for i := 0; i < len(keys); i++ { - for j := 0; j < len(prefixes); j++ { - key := prefixes[j] + "-" + keys[i] + for _, key := range keys { + for _, prefix := range prefixes { + key := roundPrefix + "-" + prefix + "-" + key // limit key put as per reqRate if err = limiter.Wait(ctxt); err != nil { @@ -285,16 +293,22 @@ func runWatcher(eps []string) { } }() + ctxc, cancelc := context.WithCancel(ctx) + + wcs := make([]clientv3.WatchChan, 0) + rcs := make([]*clientv3.Client, 0) + wg.Add(noOfPrefixes * watchPerPrefix) - for i := 0; i < noOfPrefixes; i++ { + for _, prefix := range prefixes { for j := 0; j < watchPerPrefix; j++ { go func(prefix string) { defer wg.Done() rc := randClient(eps) - defer rc.Close() + rcs = append(rcs, rc) - wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + wc := rc.Watch(ctxc, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + wcs = append(wcs, wc) for n := 0; n < len(keys); { select { case watchChan := <-wc: @@ -310,10 +324,34 @@ func runWatcher(eps []string) { return } } - }(prefixes[i]) + }(roundPrefix + "-" + prefix) } } wg.Wait() + + // cancel all watch channels + cancelc() + + // verify all watch channels are closed + for e, wc := range wcs { + if _, ok := <-wc; ok { + log.Fatalf("expected wc to be closed, but received %v", e) + } + } + + for _, rc := range rcs { + rc.Close() + } + + deletePrefixWithRety(client, ctx, roundPrefix) +} + +func deletePrefixWithRety(client *clientv3.Client, ctx context.Context, key string) { + for { + if _, err := client.Delete(ctx, key, clientv3.WithRange(key+"z")); err == nil { + return + } + } } func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {