etcdserver: fix race in TestTriggerSnap

Fixes #4584
This commit is contained in:
Anthony Romano 2016-02-21 22:01:51 -08:00
parent 55c3cf3ce6
commit a524d5bdb7
3 changed files with 39 additions and 25 deletions

View File

@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newNopReadyNode()
r := raftNode{
Node: n,
storage: &storageRecorder{},
storage: newStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
}

View File

@ -168,7 +168,7 @@ func TestApplyRepeat(t *testing.T) {
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
@ -625,7 +625,7 @@ func TestDoProposal(t *testing.T) {
cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeCommitter(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
},
@ -776,7 +776,7 @@ func TestSyncTrigger(t *testing.T) {
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
},
store: store.NewNop(),
SyncTicker: st,
@ -822,7 +822,7 @@ func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := store.NewRecorder()
p := &storageRecorder{}
p := newStorageRecorder("")
srv := &EtcdServer{
cfg: &ServerConfig{},
r: raftNode{
@ -856,7 +856,7 @@ func TestSnapshot(t *testing.T) {
func TestTriggerSnap(t *testing.T) {
snapc := 10
st := store.NewRecorder()
p := &storageRecorder{}
p := newStorageRecorderStream("")
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
snapCount: uint64(snapc),
@ -870,23 +870,29 @@ func TestTriggerSnap(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
donec := make(chan struct{})
go func() {
wcnt := 2 + snapc
gaction, _ := p.Wait(wcnt)
// each operation is recorded as a Save
// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
if len(gaction) != wcnt {
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
}
close(donec)
}()
for i := 0; i < snapc+1; i++ {
srv.Do(context.Background(), pb.Request{Method: "PUT"})
}
wcnt := 2 + snapc
gaction, _ := p.Wait(wcnt)
srv.Stop()
// each operation is recorded as a Save
// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
if len(gaction) != wcnt {
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
}
<-donec
}
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
@ -919,7 +925,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
r: raftNode{
Node: n,
transport: tr,
storage: &storageRecorder{dbPath: testdir},
storage: newStorageRecorder(testdir),
raftStorage: rs,
},
store: cl.store,
@ -991,7 +997,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
func TestRecvSnapshot(t *testing.T) {
n := newNopReadyNode()
st := store.NewRecorder()
p := &storageRecorder{}
p := newStorageRecorder("")
cl := newCluster("abc")
cl.SetStore(store.New())
s := &EtcdServer{
@ -1038,7 +1044,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
cfg: &ServerConfig{},
r: raftNode{
Node: n,
storage: &storageRecorder{},
storage: newStorageRecorder(""),
raftStorage: storage,
transport: rafthttp.NewNopTransporter(),
},
@ -1082,7 +1088,7 @@ func TestAddMember(t *testing.T) {
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
@ -1122,7 +1128,7 @@ func TestRemoveMember(t *testing.T) {
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
@ -1161,7 +1167,7 @@ func TestUpdateMember(t *testing.T) {
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
storage: newStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
},
store: st,

View File

@ -149,10 +149,18 @@ func makeMemberDir(dir string) error {
}
type storageRecorder struct {
testutil.RecorderBuffered
testutil.Recorder
dbPath string // must have '/' suffix if set
}
func newStorageRecorder(db string) *storageRecorder {
return &storageRecorder{&testutil.RecorderBuffered{}, db}
}
func newStorageRecorderStream(db string) *storageRecorder {
return &storageRecorder{testutil.NewRecorderStream(), db}
}
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
p.Record(testutil.Action{Name: "Save"})
return nil