diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 816c73fb7..c63866a8c 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -709,3 +709,21 @@ func TestWatchWithFilter(t *testing.T) { case <-time.After(100 * time.Millisecond): } } + +// TestWatchWithCreatedNotification checks that createdNotification works. +func TestWatchWithCreatedNotification(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + + ctx := context.Background() + + createC := client.Watch(ctx, "a", clientv3.WithCreatedNotify()) + + resp := <-createC + + if !resp.Created { + t.Fatalf("expected created event, got %v", resp) + } +} diff --git a/clientv3/op.go b/clientv3/op.go index 1a176b345..41ab52c96 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -50,6 +50,8 @@ type Op struct { // progressNotify is for progress updates. progressNotify bool + // createdNotify is for created event + createdNotify bool // filters for watchers filterPut bool filterDelete bool @@ -116,6 +118,8 @@ func OpDelete(key string, opts ...OpOption) Op { panic("unexpected countOnly in delete") case ret.filterDelete, ret.filterPut: panic("unexpected filter in delete") + case ret.createdNotify: + panic("unexpected createdNotify in delete") } return ret } @@ -138,6 +142,8 @@ func OpPut(key, val string, opts ...OpOption) Op { panic("unexpected countOnly in put") case ret.filterDelete, ret.filterPut: panic("unexpected filter in put") + case ret.createdNotify: + panic("unexpected createdNotify in put") } return ret } @@ -281,6 +287,13 @@ func WithProgressNotify() OpOption { } } +// WithCreatedNotify makes watch server sends the created event. +func WithCreatedNotify() OpOption { + return func(op *Op) { + op.createdNotify = true + } +} + // WithFilterPut discards PUT events from the watcher. func WithFilterPut() OpOption { return func(op *Op) { op.filterPut = true } diff --git a/clientv3/watch.go b/clientv3/watch.go index 05f022c38..70aba565f 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -61,6 +61,9 @@ type WatchResponse struct { // the channel sends a final response that has Canceled set to true with a non-nil Err(). Canceled bool + // Created is used to indicate the creation of the watcher. + Created bool + closeErr error } @@ -98,6 +101,7 @@ type watcher struct { // mu protects the grpc streams map mu sync.RWMutex + // streams holds all the active grpc streams keyed by ctx value. streams map[string]*watchGrpcStream } @@ -138,6 +142,8 @@ type watchRequest struct { key string end string rev int64 + // send created notification event if this field is true + createdNotify bool // progressNotify is for progress updates progressNotify bool // filters is the list of events to filter out @@ -223,6 +229,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch wr := &watchRequest{ ctx: ctx, + createdNotify: ow.createdNotify, key: string(ow.key), end: string(ow.end), rev: ow.rev, @@ -418,6 +425,7 @@ func (w *watchGrpcStream) run() { w.addStream(pbresp, pendingReq) pendingReq = nil curReqC = w.reqc + w.dispatchEvent(pbresp) case pbresp.Canceled: delete(cancelSet, pbresp.WatchId) // shutdown serveStream, if any @@ -489,19 +497,23 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { w.mu.RLock() defer w.mu.RUnlock() ws, ok := w.streams[pbresp.WatchId] + if !ok { + return false + } + events := make([]*Event, len(pbresp.Events)) for i, ev := range pbresp.Events { events[i] = (*Event)(ev) } - if ok { - wr := &WatchResponse{ - Header: *pbresp.Header, - Events: events, - CompactRevision: pbresp.CompactRevision, - Canceled: pbresp.Canceled} - ws.recvc <- wr + wr := &WatchResponse{ + Header: *pbresp.Header, + Events: events, + CompactRevision: pbresp.CompactRevision, + Created: pbresp.Created, + Canceled: pbresp.Canceled, } - return ok + ws.recvc <- wr + return true } // serveWatchClient forwards messages from the grpc stream to run() @@ -533,6 +545,14 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { for !closing { curWr := emptyWr outc := ws.outc + + // ignore created event if create notify is not requested or + // we already sent the initial created event (when we are on the resume path). + if len(wrs) > 0 && wrs[0].Created && + (!ws.initReq.createdNotify || ws.lastRev != 0) { + wrs = wrs[1:] + } + if len(wrs) > 0 { curWr = wrs[0] } else {