mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
@@ -448,12 +448,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
||||
pendingEventsGauge.Add(float64(len(eb.evs)))
|
||||
} else {
|
||||
// move slow watcher to victims
|
||||
w.minRev = rev + 1
|
||||
w.victim = true
|
||||
victim[w] = eb
|
||||
s.synced.delete(w)
|
||||
slowWatcherGauge.Inc()
|
||||
}
|
||||
// always update minRev
|
||||
// in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced
|
||||
// in case 'send' returns false, this is needed for syncWatchers
|
||||
w.minRev = rev + 1
|
||||
}
|
||||
s.addVictim(victim)
|
||||
}
|
||||
|
||||
@@ -296,34 +296,22 @@ func TestWatchRestore(t *testing.T) {
|
||||
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
newBackend, _ := betesting.NewDefaultTmpBackend(t)
|
||||
newStore := newWatchableStore(zaptest.NewLogger(t), newBackend, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(newStore, newBackend)
|
||||
|
||||
w := newStore.NewWatchStream()
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
||||
w.Watch(0, testKey, nil, rev-1)
|
||||
w.Watch(0, testKey, nil, 1)
|
||||
|
||||
time.Sleep(delay)
|
||||
wantRev := s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
newStore.Restore(b)
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if resp.Revision != rev {
|
||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("failed to get events from the response")
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != rev {
|
||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
s.Restore(b)
|
||||
events := readEventsForSecond(w.Chan())
|
||||
if len(events) != 1 {
|
||||
t.Errorf("Expected only one event, got %d", len(events))
|
||||
}
|
||||
if events[0].Kv.ModRevision != wantRev {
|
||||
t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,6 +319,17 @@ func TestWatchRestore(t *testing.T) {
|
||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||
}
|
||||
|
||||
func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
|
||||
for {
|
||||
select {
|
||||
case resp := <-ws:
|
||||
events = append(events, resp.Events...)
|
||||
case <-time.After(time.Second):
|
||||
return events
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchRestoreSyncedWatcher tests such a case that:
|
||||
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
||||
// 2. watcher with a future revision is added to "synced" watcher group
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
triggerTimeout = 5 * time.Second
|
||||
triggerTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -53,7 +53,8 @@ var (
|
||||
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
|
||||
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
|
||||
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
|
||||
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{}
|
||||
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: false}
|
||||
BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true}
|
||||
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
|
||||
oneNodeClusterFailpoints = []Failpoint{
|
||||
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
|
||||
@@ -78,6 +79,7 @@ var (
|
||||
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower}
|
||||
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
|
||||
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
|
||||
BlackholeUntilSnapshot,
|
||||
}}
|
||||
)
|
||||
|
||||
@@ -308,11 +310,13 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type blackholePeerNetworkFailpoint struct{}
|
||||
type blackholePeerNetworkFailpoint struct {
|
||||
waitTillSnapshot bool
|
||||
}
|
||||
|
||||
func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
return triggerBlackhole(t, ctx, member, clus, false)
|
||||
return triggerBlackhole(t, ctx, member, clus, f.waitTillSnapshot)
|
||||
}
|
||||
|
||||
func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
|
||||
|
||||
Reference in New Issue
Block a user