mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Test watch restore
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@@ -307,32 +307,22 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
|
|
||||||
testKey := []byte("foo")
|
testKey := []byte("foo")
|
||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
w := s.NewWatchStream()
|
||||||
|
defer w.Close()
|
||||||
newBackend, newPath := betesting.NewDefaultTmpBackend(t)
|
w.Watch(0, testKey, nil, 1)
|
||||||
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, StoreConfig{})
|
|
||||||
defer cleanup(newStore, newBackend, newPath)
|
|
||||||
|
|
||||||
w := newStore.NewWatchStream()
|
|
||||||
w.Watch(0, testKey, nil, rev-1)
|
|
||||||
|
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
|
wantRev := s.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
newStore.Restore(b)
|
s.Restore(b)
|
||||||
select {
|
events := readEventsForSecond(w.Chan())
|
||||||
case resp := <-w.Chan():
|
if len(events) != 1 {
|
||||||
if resp.Revision != rev {
|
t.Errorf("Expected only one event, got %d", len(events))
|
||||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
|
||||||
}
|
|
||||||
if len(resp.Events) != 1 {
|
|
||||||
t.Fatalf("failed to get events from the response")
|
|
||||||
}
|
|
||||||
if resp.Events[0].Kv.ModRevision != rev {
|
|
||||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatal("failed to receive event in 1 second.")
|
|
||||||
}
|
}
|
||||||
|
if events[0].Kv.ModRevision != wantRev {
|
||||||
|
t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -340,6 +330,17 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case resp := <-ws:
|
||||||
|
events = append(events, resp.Events...)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestWatchRestoreSyncedWatcher tests such a case that:
|
// TestWatchRestoreSyncedWatcher tests such a case that:
|
||||||
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
||||||
// 2. watcher with a future revision is added to "synced" watcher group
|
// 2. watcher with a future revision is added to "synced" watcher group
|
||||||
|
|||||||
Reference in New Issue
Block a user