diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 19235e102..6816c9fab 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -366,6 +366,7 @@ func (s *watchableStore) syncWatchers() { // will be processed next time and hopefully it will not be full. continue } + w.cur = curRev s.synced.add(w) s.unsynced.delete(w) } diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 0a154ac72..db61b3e7b 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -157,51 +157,52 @@ func TestSyncWatchers(t *testing.T) { watcherN := 100 for i := 0; i < watcherN; i++ { - // use 1 to keep watchers in unsynced + // specify rev as 1 to keep watchers in unsynced w.Watch(testKey, true, 1) } - // Before running s.syncWatchers() - // - // synced should be empty - // because we manually populate unsynced only - if len(s.synced[string(testKey)]) != 0 { - t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)])) + // Before running s.syncWatchers() synced should be empty because we manually + // populate unsynced only + sws, _ := s.synced.getSetByKey(string(testKey)) + uws, _ := s.unsynced.getSetByKey(string(testKey)) + + if len(sws) != 0 { + t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws)) } - // unsynced should not be empty - // because we manually populated unsynced only - if len(s.unsynced) == 0 { - t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN) + // unsynced should not be empty because we manually populated unsynced only + if len(uws) != watcherN { + t.Errorf("unsynced size = %d, want %d", len(uws), watcherN) } - // this should move all unsynced watchers - // to synced ones + // this should move all unsynced watchers to synced ones s.syncWatchers() - // After running s.syncWatchers() - // - // synced should not be empty - // because syncwatchers populates synced - // in this test case - if len(s.synced[string(testKey)]) == 0 { - t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)])) - } - // unsynced should be empty - // because syncwatchers is expected to move - // all watchers from unsynced to synced - // in this test case - if len(s.unsynced) != 0 { - t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) + sws, _ = s.synced.getSetByKey(string(testKey)) + uws, _ = s.unsynced.getSetByKey(string(testKey)) + + // After running s.syncWatchers(), synced should not be empty because syncwatchers + // populates synced in this test case + if len(sws) != watcherN { + t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN) + } + + // unsynced should be empty because syncwatchers is expected to move all watchers + // from unsynced to synced in this test case + if len(uws) != 0 { + t.Errorf("unsynced size = %d, want 0", len(uws)) + } + + for w := range sws { + if w.cur != s.Rev() { + t.Errorf("w.cur = %d, want %d", w.cur, s.Rev()) + } } - // All of the watchers actually share one channel - // so we only need to check one shared channel - // (See watcher.go for more detail). if len(w.(*watchStream).ch) != watcherN { t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN) } - wr := <-w.(*watchStream).ch - evs := wr.Events + + evs := (<-w.(*watchStream).ch).Events if len(evs) != 1 { t.Errorf("len(evs) got = %d, want = 1", len(evs)) }