diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 9939eafd4..5ca6160d8 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -297,3 +297,28 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { } } } + +func TestWatchInvalidFutureRevision(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + w := clientv3.NewWatcher(clus.RandClient()) + defer w.Close() + + rch := w.Watch(context.Background(), "foo", clientv3.WithRev(100)) + + wresp, ok := <-rch // WatchResponse from canceled one + if !ok { + t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok) + } + if !wresp.Canceled { + t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled) + } + + _, ok = <-rch // ensure the channel is closed + if ok != false { + t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok) + } +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 149497896..dd3dd7cc5 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -44,6 +44,9 @@ type WatchResponse struct { // CompactRevision is set to the compaction revision that // caused the watcher to cancel. CompactRevision int64 + + // Canceled is 'true' when it has received wrong watch start revision. + Canceled bool } // watcher implements the Watcher interface @@ -165,12 +168,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { // no pending request; ignore return } - if resp.CompactRevision != 0 { + if resp.Canceled || resp.CompactRevision != 0 { // compaction after start revision ret := make(chan WatchResponse, 1) ret <- WatchResponse{ Header: *resp.Header, - CompactRevision: resp.CompactRevision} + CompactRevision: resp.CompactRevision, + Canceled: resp.Canceled} close(ret) pendingReq.retc <- ret return @@ -251,13 +255,13 @@ func (w *watcher) run() { // New events from the watch client case pbresp := <-w.respc: switch { - case pbresp.Canceled: - delete(cancelSet, pbresp.WatchId) case pbresp.Created: // response to pending req, try to add w.addStream(pbresp, pendingReq) pendingReq = nil curReqC = w.reqc + case pbresp.Canceled: + delete(cancelSet, pbresp.WatchId) default: // dispatch to appropriate watch stream if ok := w.dispatchEvent(pbresp); ok { @@ -317,7 +321,8 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { wr := &WatchResponse{ Header: *pbresp.Header, Events: pbresp.Events, - CompactRevision: pbresp.CompactRevision} + CompactRevision: pbresp.CompactRevision, + Canceled: pbresp.Canceled} ws.recvc <- wr } return ok diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8487cc46a..31dcfe328 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -108,6 +108,13 @@ func (sws *serverWatchStream) recvLoop() error { if rev == 0 { // rev 0 watches past the current revision rev = wsrev + 1 + } else if rev > wsrev { // do not allow watching future revision. + sws.ctrlStream <- &pb.WatchResponse{ + Header: sws.newResponseHeader(wsrev), + Created: true, + Canceled: true, + } + continue } id := sws.watchStream.Watch(toWatch, prefix, rev) sws.ctrlStream <- &pb.WatchResponse{ diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index fc5e8df25..65772b7a6 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -782,3 +782,32 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat } return true, nil } + +// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs +// returns WatchResponse of true Created and true Canceled. +func TestV3WatchInvalidFutureRevision(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, wErr := clus.RandClient().Watch.Watch(ctx) + if wErr != nil { + t.Fatalf("wAPI.Watch error: %v", wErr) + } + + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("watch request failed (%v)", err) + } + + resp, err := wStream.Recv() + if err != nil { + t.Errorf("wStream.Recv error: %v", err) + } + if !resp.Created || !resp.Canceled || len(resp.Events) != 0 { + t.Errorf("invalid start rev should return true, true, 0, but got %v, %v, %d", resp.Created, resp.Canceled, len(resp.Events)) + } +}