Merge pull request #9281 from yudai/fix_unrestored_watchers

mvcc: restore unsynced watchers
This commit is contained in:
Gyuho Lee 2018-02-06 11:33:13 -08:00 committed by GitHub
commit 63183f8c18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 25 deletions

View File

@ -192,7 +192,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
} }
for wa := range s.synced.watchers { for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa) s.unsynced.add(wa)
} }
s.synced = newWatcherGroup() s.synced = newWatcherGroup()
return nil return nil

View File

@ -297,36 +297,45 @@ func TestWatchFutureRev(t *testing.T) {
} }
func TestWatchRestore(t *testing.T) { func TestWatchRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend() test := func(delay time.Duration) func(t *testing.T) {
s := newWatchableStore(b, &lease.FakeLessor{}, nil) return func(t *testing.T) {
defer cleanup(s, b, tmpPath) b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)
testKey := []byte("foo") testKey := []byte("foo")
testValue := []byte("bar") testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease) rev := s.Put(testKey, testValue, lease.NoLease)
newBackend, newPath := backend.NewDefaultTmpBackend() newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath) defer cleanup(newStore, newBackend, newPath)
w := newStore.NewWatchStream() w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1) w.Watch(0, testKey, nil, rev-1)
newStore.Restore(b) time.Sleep(delay)
select {
case resp := <-w.Chan(): newStore.Restore(b)
if resp.Revision != rev { select {
t.Fatalf("rev = %d, want %d", resp.Revision, rev) case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
} }
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
} }
t.Run("Normal", test(0))
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
} }
// TestWatchBatchUnsynced tests batching on unsynced watchers // TestWatchBatchUnsynced tests batching on unsynced watchers