mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
parent
e9d576c3d6
commit
78d68226e6
@ -179,6 +179,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
err := s.store.Restore(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
s.unsynced.watchers.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
|
||||
func (s *watchableStore) syncWatchersLoop() {
|
||||
defer s.wg.Done()
|
||||
|
@ -294,6 +294,39 @@ func TestWatchFutureRev(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchRestore(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(newStore, newBackend, newPath)
|
||||
|
||||
w := newStore.NewWatchStream()
|
||||
w.Watch(testKey, nil, rev-1)
|
||||
|
||||
newStore.Restore(b)
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if resp.Revision != rev {
|
||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("failed to get events from the response")
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != rev {
|
||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
Loading…
x
Reference in New Issue
Block a user