From a78604dacbb8f9f446d29aa4dd6f3def6bf68edb Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 24 Feb 2016 16:30:20 -0800 Subject: [PATCH] *: watch true cancel, created for wrong rev This sets Created and Cancel true in pb.WatchResponse when it has received wrong start revision instead of panic. So that clientv3 can set 'Canceled' in WatchResponse as well. Fix https://github.com/coreos/etcd/issues/4610. --- clientv3/integration/watch_test.go | 25 +++++++++++++++++++++++++ clientv3/watch.go | 15 ++++++++++----- etcdserver/api/v3rpc/watch.go | 7 +++++++ integration/v3_watch_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 9939eafd4..5ca6160d8 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -297,3 +297,28 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { } } } + +func TestWatchInvalidFutureRevision(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + w := clientv3.NewWatcher(clus.RandClient()) + defer w.Close() + + rch := w.Watch(context.Background(), "foo", clientv3.WithRev(100)) + + wresp, ok := <-rch // WatchResponse from canceled one + if !ok { + t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok) + } + if !wresp.Canceled { + t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled) + } + + _, ok = <-rch // ensure the channel is closed + if ok != false { + t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok) + } +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 149497896..dd3dd7cc5 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -44,6 +44,9 @@ type WatchResponse struct { // CompactRevision is set to the compaction revision that // caused the watcher to cancel. CompactRevision int64 + + // Canceled is 'true' when it has received wrong watch start revision. + Canceled bool } // watcher implements the Watcher interface @@ -165,12 +168,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { // no pending request; ignore return } - if resp.CompactRevision != 0 { + if resp.Canceled || resp.CompactRevision != 0 { // compaction after start revision ret := make(chan WatchResponse, 1) ret <- WatchResponse{ Header: *resp.Header, - CompactRevision: resp.CompactRevision} + CompactRevision: resp.CompactRevision, + Canceled: resp.Canceled} close(ret) pendingReq.retc <- ret return @@ -251,13 +255,13 @@ func (w *watcher) run() { // New events from the watch client case pbresp := <-w.respc: switch { - case pbresp.Canceled: - delete(cancelSet, pbresp.WatchId) case pbresp.Created: // response to pending req, try to add w.addStream(pbresp, pendingReq) pendingReq = nil curReqC = w.reqc + case pbresp.Canceled: + delete(cancelSet, pbresp.WatchId) default: // dispatch to appropriate watch stream if ok := w.dispatchEvent(pbresp); ok { @@ -317,7 +321,8 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { wr := &WatchResponse{ Header: *pbresp.Header, Events: pbresp.Events, - CompactRevision: pbresp.CompactRevision} + CompactRevision: pbresp.CompactRevision, + Canceled: pbresp.Canceled} ws.recvc <- wr } return ok diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8487cc46a..31dcfe328 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -108,6 +108,13 @@ func (sws *serverWatchStream) recvLoop() error { if rev == 0 { // rev 0 watches past the current revision rev = wsrev + 1 + } else if rev > wsrev { // do not allow watching future revision. + sws.ctrlStream <- &pb.WatchResponse{ + Header: sws.newResponseHeader(wsrev), + Created: true, + Canceled: true, + } + continue } id := sws.watchStream.Watch(toWatch, prefix, rev) sws.ctrlStream <- &pb.WatchResponse{ diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index fc5e8df25..65772b7a6 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -782,3 +782,32 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat } return true, nil } + +// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs +// returns WatchResponse of true Created and true Canceled. +func TestV3WatchInvalidFutureRevision(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, wErr := clus.RandClient().Watch.Watch(ctx) + if wErr != nil { + t.Fatalf("wAPI.Watch error: %v", wErr) + } + + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("watch request failed (%v)", err) + } + + resp, err := wStream.Recv() + if err != nil { + t.Errorf("wStream.Recv error: %v", err) + } + if !resp.Created || !resp.Canceled || len(resp.Events) != 0 { + t.Errorf("invalid start rev should return true, true, 0, but got %v, %v, %d", resp.Created, resp.Canceled, len(resp.Events)) + } +}