From a524d5bdb731d454ead74e1ec88a2ef86fcd9f7b Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 21 Feb 2016 22:01:51 -0800 Subject: [PATCH] etcdserver: fix race in TestTriggerSnap Fixes #4584 --- etcdserver/raft_test.go | 2 +- etcdserver/server_test.go | 52 ++++++++++++++++++++++----------------- etcdserver/storage.go | 10 +++++++- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index d3f5cbde5..7250839c3 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { n := newNopReadyNode() r := raftNode{ Node: n, - storage: &storageRecorder{}, + storage: newStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ebd45c286..dca396044 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -168,7 +168,7 @@ func TestApplyRepeat(t *testing.T) { r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, cfg: &ServerConfig{}, @@ -625,7 +625,7 @@ func TestDoProposal(t *testing.T) { cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeCommitter(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), }, @@ -776,7 +776,7 @@ func TestSyncTrigger(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), }, store: store.NewNop(), SyncTicker: st, @@ -822,7 +822,7 @@ func TestSnapshot(t *testing.T) { s := raft.NewMemoryStorage() s.Append([]raftpb.Entry{{Index: 1}}) st := store.NewRecorder() - p := &storageRecorder{} + p := newStorageRecorder("") srv := &EtcdServer{ cfg: &ServerConfig{}, r: raftNode{ @@ -856,7 +856,7 @@ func TestSnapshot(t *testing.T) { func TestTriggerSnap(t *testing.T) { snapc := 10 st := store.NewRecorder() - p := &storageRecorder{} + p := newStorageRecorderStream("") srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, snapCount: uint64(snapc), @@ -870,23 +870,29 @@ func TestTriggerSnap(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() + + donec := make(chan struct{}) + go func() { + wcnt := 2 + snapc + gaction, _ := p.Wait(wcnt) + + // each operation is recorded as a Save + // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + if len(gaction) != wcnt { + t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) + } + if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) + } + close(donec) + }() + for i := 0; i < snapc+1; i++ { srv.Do(context.Background(), pb.Request{Method: "PUT"}) } - wcnt := 2 + snapc - gaction, _ := p.Wait(wcnt) - srv.Stop() - - // each operation is recorded as a Save - // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap - if len(gaction) != wcnt { - t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) - } - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) - } + <-donec } // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with @@ -919,7 +925,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { r: raftNode{ Node: n, transport: tr, - storage: &storageRecorder{dbPath: testdir}, + storage: newStorageRecorder(testdir), raftStorage: rs, }, store: cl.store, @@ -991,7 +997,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { func TestRecvSnapshot(t *testing.T) { n := newNopReadyNode() st := store.NewRecorder() - p := &storageRecorder{} + p := newStorageRecorder("") cl := newCluster("abc") cl.SetStore(store.New()) s := &EtcdServer{ @@ -1038,7 +1044,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { cfg: &ServerConfig{}, r: raftNode{ Node: n, - storage: &storageRecorder{}, + storage: newStorageRecorder(""), raftStorage: storage, transport: rafthttp.NewNopTransporter(), }, @@ -1082,7 +1088,7 @@ func TestAddMember(t *testing.T) { r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, cfg: &ServerConfig{}, @@ -1122,7 +1128,7 @@ func TestRemoveMember(t *testing.T) { r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, cfg: &ServerConfig{}, @@ -1161,7 +1167,7 @@ func TestUpdateMember(t *testing.T) { r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), - storage: &storageRecorder{}, + storage: newStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, store: st, diff --git a/etcdserver/storage.go b/etcdserver/storage.go index e7a0d58c3..3a69d54e6 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -149,10 +149,18 @@ func makeMemberDir(dir string) error { } type storageRecorder struct { - testutil.RecorderBuffered + testutil.Recorder dbPath string // must have '/' suffix if set } +func newStorageRecorder(db string) *storageRecorder { + return &storageRecorder{&testutil.RecorderBuffered{}, db} +} + +func newStorageRecorderStream(db string) *storageRecorder { + return &storageRecorder{testutil.NewRecorderStream(), db} +} + func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { p.Record(testutil.Action{Name: "Save"}) return nil