diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 515300725..a8d37efa0 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -145,6 +145,10 @@ type serverWatchStream struct { // records fragmented watch IDs fragment map[mvcc.WatchID]bool + // indicates whether we have an outstanding global progress + // notification to send + deferredProgress bool + // closec indicates the stream is closed. closec chan struct{} @@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { prevKV: make(map[mvcc.WatchID]bool), fragment: make(map[mvcc.WatchID]bool), + deferredProgress: false, + closec: make(chan struct{}), } @@ -360,10 +366,16 @@ func (sws *serverWatchStream) recvLoop() error { } case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels + sws.mu.Lock() + // Ignore if deferred progress notification is already in progress + if !sws.deferredProgress { + // Request progress for all watchers, + // force generation of a response + if !sws.watchStream.RequestProgressAll() { + sws.deferredProgress = true + } } + sws.mu.Unlock() } default: // we probably should not shutdown the entire stream when @@ -432,11 +444,15 @@ func (sws *serverWatchStream) sendLoop() { Canceled: canceled, } - if _, okID := ids[wresp.WatchID]; !okID { - // buffer if id not yet announced - wrs := append(pending[wresp.WatchID], wr) - pending[wresp.WatchID] = wrs - continue + // Progress notifications can have WatchID -1 + // if they announce on behalf of multiple watchers + if wresp.WatchID != clientv3.InvalidWatchID { + if _, okID := ids[wresp.WatchID]; !okID { + // buffer if id not yet announced + wrs := append(pending[wresp.WatchID], wr) + pending[wresp.WatchID] = wrs + continue + } } mvcc.ReportEventReceived(len(evs)) @@ -467,6 +483,11 @@ func (sws *serverWatchStream) sendLoop() { // elide next progress update if sent a key update sws.progress[wresp.WatchID] = false } + if sws.deferredProgress { + if sws.watchStream.RequestProgressAll() { + sws.deferredProgress = false + } + } sws.mu.Unlock() case c, ok := <-sws.ctrlStream: diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index f0d056f28..4e7b5a714 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -19,6 +19,7 @@ import ( "time" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" @@ -41,6 +42,7 @@ var ( type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) + progressAll(watchers map[WatchID]*watcher) bool rev() int64 } @@ -475,14 +477,34 @@ func (s *watchableStore) addVictim(victim watcherBatch) { func (s *watchableStore) rev() int64 { return s.store.Rev() } func (s *watchableStore) progress(w *watcher) { + s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id) +} + +func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool { + return s.progressIfSync(watchers, clientv3.InvalidWatchID) +} + +func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool { s.mu.RLock() defer s.mu.RUnlock() - if _, ok := s.synced.watchers[w]; ok { - w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) - // If the ch is full, this watcher is receiving events. - // We do not need to send progress at all. + // Any watcher unsynced? + for _, w := range watchers { + if _, ok := s.synced.watchers[w]; !ok { + return false + } } + + // If all watchers are synchronised, send out progress + // notification on first watcher. Note that all watchers + // should have the same underlying stream, and the progress + // notification will be broadcasted client-side if required + // (see dispatchEvent in client/v3/watch.go) + for _, w := range watchers { + w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()}) + return true + } + return true } type watcher struct { diff --git a/server/storage/mvcc/watcher.go b/server/storage/mvcc/watcher.go index 7d2490b1d..c67c21d61 100644 --- a/server/storage/mvcc/watcher.go +++ b/server/storage/mvcc/watcher.go @@ -58,6 +58,13 @@ type WatchStream interface { // of the watchers since the watcher is currently synced. RequestProgress(id WatchID) + // RequestProgressAll requests a progress notification for all + // watchers sharing the stream. If all watchers are synced, a + // progress notification with watch ID -1 will be sent to an + // arbitrary watcher of this stream, and the function returns + // true. + RequestProgressAll() bool + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error @@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) { } ws.watchable.progress(w) } + +func (ws *watchStream) RequestProgressAll() bool { + ws.mu.Lock() + defer ws.mu.Unlock() + return ws.watchable.progressAll(ws.watchers) +} diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index b86e31a55..41bbb5108 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" ) @@ -342,6 +343,55 @@ func TestWatcherRequestProgress(t *testing.T) { } } +func TestWatcherRequestProgressAll(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + + // manually create watchableStore instead of newWatchableStore + // because newWatchableStore automatically calls syncWatchers + // method to sync watchers in unsynced map. We want to keep watchers + // in unsynced to test if syncWatchers works as expected. + s := &watchableStore{ + store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + stopc: make(chan struct{}), + } + + defer cleanup(s, b) + + testKey := []byte("foo") + notTestKey := []byte("bad") + testValue := []byte("bar") + s.Put(testKey, testValue, lease.NoLease) + + // Create watch stream with watcher. We will not actually get + // any notifications on it specifically, but there needs to be + // at least one Watch for progress notifications to get + // generated. + w := s.NewWatchStream() + w.Watch(0, notTestKey, nil, 1) + + w.RequestProgressAll() + select { + case resp := <-w.Chan(): + t.Fatalf("unexpected %+v", resp) + default: + } + + s.syncWatchers() + + w.RequestProgressAll() + wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2} + select { + case resp := <-w.Chan(): + if !reflect.DeepEqual(resp, wrs) { + t.Fatalf("got %+v, expect %+v", resp, wrs) + } + case <-time.After(time.Second): + t.Fatal("failed to receive progress") + } +} + func TestWatcherWatchWithFilter(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))