storage: update watch.cur and fix tests

This commit is contained in:
Xiang Li 2016-02-03 00:54:07 -08:00
parent 3ed404633a
commit b09214df32
2 changed files with 34 additions and 32 deletions

View File

@ -366,6 +366,7 @@ func (s *watchableStore) syncWatchers() {
// will be processed next time and hopefully it will not be full. // will be processed next time and hopefully it will not be full.
continue continue
} }
w.cur = curRev
s.synced.add(w) s.synced.add(w)
s.unsynced.delete(w) s.unsynced.delete(w)
} }

View File

@ -157,51 +157,52 @@ func TestSyncWatchers(t *testing.T) {
watcherN := 100 watcherN := 100
for i := 0; i < watcherN; i++ { for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced // specify rev as 1 to keep watchers in unsynced
w.Watch(testKey, true, 1) w.Watch(testKey, true, 1)
} }
// Before running s.syncWatchers() // Before running s.syncWatchers() synced should be empty because we manually
// // populate unsynced only
// synced should be empty sws, _ := s.synced.getSetByKey(string(testKey))
// because we manually populate unsynced only uws, _ := s.unsynced.getSetByKey(string(testKey))
if len(s.synced[string(testKey)]) != 0 {
t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)])) if len(sws) != 0 {
t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws))
} }
// unsynced should not be empty // unsynced should not be empty because we manually populated unsynced only
// because we manually populated unsynced only if len(uws) != watcherN {
if len(s.unsynced) == 0 { t.Errorf("unsynced size = %d, want %d", len(uws), watcherN)
t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
} }
// this should move all unsynced watchers // this should move all unsynced watchers to synced ones
// to synced ones
s.syncWatchers() s.syncWatchers()
// After running s.syncWatchers() sws, _ = s.synced.getSetByKey(string(testKey))
// uws, _ = s.unsynced.getSetByKey(string(testKey))
// synced should not be empty
// because syncwatchers populates synced // After running s.syncWatchers(), synced should not be empty because syncwatchers
// in this test case // populates synced in this test case
if len(s.synced[string(testKey)]) == 0 { if len(sws) != watcherN {
t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)])) t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN)
} }
// unsynced should be empty
// because syncwatchers is expected to move // unsynced should be empty because syncwatchers is expected to move all watchers
// all watchers from unsynced to synced // from unsynced to synced in this test case
// in this test case if len(uws) != 0 {
if len(s.unsynced) != 0 { t.Errorf("unsynced size = %d, want 0", len(uws))
t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) }
for w := range sws {
if w.cur != s.Rev() {
t.Errorf("w.cur = %d, want %d", w.cur, s.Rev())
}
} }
// All of the watchers actually share one channel
// so we only need to check one shared channel
// (See watcher.go for more detail).
if len(w.(*watchStream).ch) != watcherN { if len(w.(*watchStream).ch) != watcherN {
t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN) t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
} }
wr := <-w.(*watchStream).ch
evs := wr.Events evs := (<-w.(*watchStream).ch).Events
if len(evs) != 1 { if len(evs) != 1 {
t.Errorf("len(evs) got = %d, want = 1", len(evs)) t.Errorf("len(evs) got = %d, want = 1", len(evs))
} }