From 155412bbfaf2c727651d9f4677756761cca726e1 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 17 Feb 2016 01:05:47 -0800 Subject: [PATCH] integration: overlapped create and put v3 watcher test --- integration/v3_watch_test.go | 80 ++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index a19b14f6f..5814c9f13 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -321,6 +321,86 @@ func testV3WatchCancel(t *testing.T, startRev int64) { clus.Terminate(t) } +// TestV3WatchCurrentPutOverlap ensures current watchers receive all events with +// overlapping puts. +func TestV3WatchCurrentPutOverlap(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, wErr := clus.RandClient().Watch.Watch(ctx) + if wErr != nil { + t.Fatalf("wAPI.Watch error: %v", wErr) + } + + // last mod_revision that will be observed + nrRevisions := 32 + // first revision already allocated as empty revision + for i := 1; i < nrRevisions; i++ { + go func() { + kvc := clus.RandClient().KV + req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + if _, err := kvc.Put(context.TODO(), req); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + }() + } + + // maps watcher to current expected revision + progress := make(map[int64]int64) + + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("first watch request failed (%v)", err) + } + + more := true + progress[-1] = 0 // watcher creation pending + for more { + resp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + + if resp.Created { + // accept events > header revision + progress[resp.WatchId] = resp.Header.Revision + 1 + if resp.Header.Revision == int64(nrRevisions) { + // covered all revisions; create no more watchers + progress[-1] = int64(nrRevisions) + 1 + } else if err := wStream.Send(wreq); err != nil { + t.Fatalf("watch request failed (%v)", err) + } + } else if len(resp.Events) == 0 { + t.Fatalf("got events %v, want non-empty", resp.Events) + } else { + wRev, ok := progress[resp.WatchId] + if !ok { + t.Fatalf("got %+v, but watch id shouldn't exist ", resp) + } + if resp.Events[0].Kv.ModRevision != wRev { + t.Fatalf("got %+v, wanted first revision %d", resp, wRev) + } + lastRev := resp.Events[len(resp.Events)-1].Kv.ModRevision + progress[resp.WatchId] = lastRev + 1 + } + more = false + for _, v := range progress { + if v <= int64(nrRevisions) { + more = true + break + } + } + } + + if rok, nr := waitResponse(wStream, time.Second); !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) + } +} + func TestV3WatchMultipleWatchersSynced(t *testing.T) { defer testutil.AfterTest(t) testV3WatchMultipleWatchers(t, 0)