mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc: restore unsynced watchers
In case syncWatchersLoop() starts before Restore() is called, watchers already added by that moment are moved to s.synced by the loop. However, there is a broken logic that moves watchers from s.synced to s.uncyned without setting keyWatchers of the watcherGroup. Eventually syncWatchers() fails to pickup those watchers from s.unsynced and no events are sent to the watchers, because newWatcherBatch() called in the function uses wg.watcherSetByKey() internally that requires a proper keyWatchers value.
This commit is contained in:
parent
4178b75411
commit
b6373f1625
@ -267,7 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for wa := range s.synced.watchers {
|
for wa := range s.synced.watchers {
|
||||||
s.unsynced.watchers.add(wa)
|
s.unsynced.add(wa)
|
||||||
}
|
}
|
||||||
s.synced = newWatcherGroup()
|
s.synced = newWatcherGroup()
|
||||||
return nil
|
return nil
|
||||||
|
@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchRestore(t *testing.T) {
|
func TestWatchRestore(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
test := func(delay time.Duration) func(t *testing.T) {
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
return func(t *testing.T) {
|
||||||
defer cleanup(s, b, tmpPath)
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
testKey := []byte("foo")
|
testKey := []byte("foo")
|
||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||||
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
|
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
|
||||||
defer cleanup(newStore, newBackend, newPath)
|
defer cleanup(newStore, newBackend, newPath)
|
||||||
|
|
||||||
w := newStore.NewWatchStream()
|
w := newStore.NewWatchStream()
|
||||||
w.Watch(testKey, nil, rev-1)
|
w.Watch(0, testKey, nil, rev-1)
|
||||||
|
|
||||||
newStore.Restore(b)
|
time.Sleep(delay)
|
||||||
select {
|
|
||||||
case resp := <-w.Chan():
|
newStore.Restore(b)
|
||||||
if resp.Revision != rev {
|
select {
|
||||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
case resp := <-w.Chan():
|
||||||
|
if resp.Revision != rev {
|
||||||
|
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
||||||
|
}
|
||||||
|
if len(resp.Events) != 1 {
|
||||||
|
t.Fatalf("failed to get events from the response")
|
||||||
|
}
|
||||||
|
if resp.Events[0].Kv.ModRevision != rev {
|
||||||
|
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("failed to receive event in 1 second.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(resp.Events) != 1 {
|
|
||||||
t.Fatalf("failed to get events from the response")
|
|
||||||
}
|
|
||||||
if resp.Events[0].Kv.ModRevision != rev {
|
|
||||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatal("failed to receive event in 1 second.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Run("Normal", test(0))
|
||||||
|
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||||
|
Loading…
x
Reference in New Issue
Block a user