From 50523e22d856b55f8fa9cfb226208ef2ef95cad7 Mon Sep 17 00:00:00 2001 From: sharat Date: Wed, 19 Oct 2016 18:44:37 +0530 Subject: [PATCH] etcd-runner: make run watcher fail safe --- tools/functional-tester/etcd-runner/main.go | 46 ++++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 9aa9fef5c..11a87dd10 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -242,10 +242,7 @@ func runWatcher(eps []string) { defer client.Close() // get revision using get request - gr, err = client.Get(ctx, "non-existant") - if err != nil { - log.Fatal("Error occured while trying to get the revision.") - } + gr = getWithRetry(client, ctx, "non-existant") revision = gr.Header.Revision ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime)) @@ -255,25 +252,34 @@ func runWatcher(eps []string) { limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate) go func() { - count := 0 + var modrevision int64 for i := 0; i < len(keys); i++ { for j := 0; j < len(prefixes); j++ { key := prefixes[j] + "-" + keys[i] + // limit key put as per reqRate if err = limiter.Wait(ctxt); err != nil { break } - // perform the put operation - _, err = client.Put(ctxt, key, key) - count++ - if err == context.DeadlineExceeded { - break + modrevision = 0 + gr = getWithRetry(client, ctxt, key) + kvs := gr.Kvs + if len(kvs) > 0 { + modrevision = gr.Kvs[0].ModRevision } - if err != nil { - log.Printf("Error: %v occured while trying to key: %v, value : %v to kv store.", err, key, key) - continue + for { + txn := client.Txn(ctxt) + _, err = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", modrevision)).Then(clientv3.OpPut(key, key)).Commit() + + if err == nil { + break + } + + if err == context.DeadlineExceeded { + return + } } } } @@ -289,14 +295,14 @@ func runWatcher(eps []string) { defer rc.Close() wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) - for n := 0; n < len(keys); { select { case watchChan := <-wc: for _, event := range watchChan.Events { expectedKey := prefix + "-" + keys[n] - if expectedKey != string(event.Kv.Key) { - log.Fatalf("expected key %q, got %q", expectedKey, string(event.Kv.Key)) + receivedKey := string(event.Kv.Key) + if expectedKey != receivedKey { + log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix) } n++ } @@ -310,6 +316,14 @@ func runWatcher(eps []string) { wg.Wait() } +func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse { + for { + if gr, err := client.Get(ctx, key); err == nil { + return gr + } + } +} + func generateUniqueKeys(maxstrlen uint, keynos int) []string { keyMap := make(map[string]bool) keys := make([]string, 0)