From dafdaaedf2ef4e7ef5de2170a90f2477f02b71d9 Mon Sep 17 00:00:00 2001 From: Bogdan Kanivets Date: Thu, 9 Mar 2023 14:11:37 -0800 Subject: [PATCH 1/2] mvcc: update minRev when watcher stays synced Problem: during restore in watchableStore.Restore, synced watchers are moved to unsynced. minRev will be behind since it's not updated when watcher stays synced. Solution: update minRev fixes: https://github.com/etcd-io/etcd/issues/15271 Signed-off-by: Bogdan Kanivets Signed-off-by: Marek Siarkowicz --- server/mvcc/watchable_store.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index 27a19fae6..3a9fd344c 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -447,7 +447,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { pendingEventsGauge.Add(float64(len(eb.evs))) } else { // move slow watcher to victims - w.minRev = rev + 1 if victim == nil { victim = make(watcherBatch) } @@ -456,6 +455,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { s.synced.delete(w) slowWatcherGauge.Inc() } + // always update minRev + // in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced + // in case 'send' returns false, this is needed for syncWatchers + w.minRev = rev + 1 } s.addVictim(victim) } From 92e56ab61e2934b6cf95273ce22e0d0be3f70122 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 16 Mar 2023 21:23:52 +0100 Subject: [PATCH 2/2] server: Test watch restore Signed-off-by: Marek Siarkowicz --- server/mvcc/watchable_store_test.go | 45 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/server/mvcc/watchable_store_test.go b/server/mvcc/watchable_store_test.go index 46386a44e..34723730f 100644 --- a/server/mvcc/watchable_store_test.go +++ b/server/mvcc/watchable_store_test.go @@ -307,32 +307,22 @@ func TestWatchRestore(t *testing.T) { testKey := []byte("foo") testValue := []byte("bar") - rev := s.Put(testKey, testValue, lease.NoLease) - - newBackend, newPath := betesting.NewDefaultTmpBackend(t) - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(newStore, newBackend, newPath) - - w := newStore.NewWatchStream() - w.Watch(0, testKey, nil, rev-1) + w := s.NewWatchStream() + defer w.Close() + w.Watch(0, testKey, nil, 1) time.Sleep(delay) + wantRev := s.Put(testKey, testValue, lease.NoLease) - newStore.Restore(b) - select { - case resp := <-w.Chan(): - if resp.Revision != rev { - t.Fatalf("rev = %d, want %d", resp.Revision, rev) - } - if len(resp.Events) != 1 { - t.Fatalf("failed to get events from the response") - } - if resp.Events[0].Kv.ModRevision != rev { - t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) - } - case <-time.After(time.Second): - t.Fatal("failed to receive event in 1 second.") + s.Restore(b) + events := readEventsForSecond(w.Chan()) + if len(events) != 1 { + t.Errorf("Expected only one event, got %d", len(events)) } + if events[0].Kv.ModRevision != wantRev { + t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev) + } + } } @@ -340,6 +330,17 @@ func TestWatchRestore(t *testing.T) { t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } +func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { + for { + select { + case resp := <-ws: + events = append(events, resp.Events...) + case <-time.After(time.Second): + return events + } + } +} + // TestWatchRestoreSyncedWatcher tests such a case that: // 1. watcher is created with a future revision "math.MaxInt64 - 2" // 2. watcher with a future revision is added to "synced" watcher group