From e1bf726bc1845714914c0b98954bd1c80725d058 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 4 Jan 2016 11:24:49 -0800 Subject: [PATCH 1/3] *: split out etcdserver's test mockup objects to live in interfaces' packages --- etcdserver/cluster_test.go | 6 +- etcdserver/raft_test.go | 3 +- etcdserver/server_test.go | 337 ++++++++----------------------------- etcdserver/storage.go | 31 ++++ pkg/wait/wait.go | 38 +++++ rafthttp/transport.go | 39 +++++ store/store.go | 124 ++++++++++++++ store/watcher.go | 8 + 8 files changed, 316 insertions(+), 270 deletions(-) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 9c8550b22..3d2e31606 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -406,7 +406,7 @@ func TestClusterGenID(t *testing.T) { } previd := cs.ID() - cs.SetStore(&storeRecorder{}) + cs.SetStore(store.NewNop()) cs.AddMember(newTestMember(3, nil, "", nil)) cs.genID() if cs.ID() == previd { @@ -447,7 +447,7 @@ func TestNodeToMemberBad(t *testing.T) { } func TestClusterAddMember(t *testing.T) { - st := &storeRecorder{} + st := store.NewRecorder() c := newTestCluster(nil) c.SetStore(st) c.AddMember(newTestMember(1, nil, "node1", nil)) @@ -492,7 +492,7 @@ func TestClusterMembers(t *testing.T) { } func TestClusterRemoveMember(t *testing.T) { - st := &storeRecorder{} + st := store.NewRecorder() c := newTestCluster(nil) c.SetStore(st) c.RemoveMember(1) diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 7cce36870..77366715c 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) func TestGetIDs(t *testing.T) { @@ -154,7 +155,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { Node: n, storage: &storageRecorder{}, raftStorage: raft.NewMemoryStorage(), - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), } r.start(&EtcdServer{r: r}) n.readyc <- raft.Ready{} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e02391738..4ab7a7c6e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net/http" "os" "path" "reflect" @@ -35,7 +34,7 @@ import ( "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/rafthttp" dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/store" ) @@ -52,7 +51,7 @@ func TestDoLocalAction(t *testing.T) { }{ { pb.Request{Method: "GET", ID: 1, Wait: true}, - Response{Watcher: &nopWatcher{}}, nil, []testutil.Action{{Name: "Watch"}}, + Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}}, }, { pb.Request{Method: "GET", ID: 1}, @@ -80,7 +79,7 @@ func TestDoLocalAction(t *testing.T) { }, } for i, tt := range tests { - st := &storeRecorder{} + st := store.NewRecorder() srv := &EtcdServer{ store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -133,7 +132,7 @@ func TestDoBadLocalAction(t *testing.T) { }, } for i, tt := range tests { - st := &errStoreRecorder{err: storeErr} + st := store.NewErrRecorder(storeErr) srv := &EtcdServer{ store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -377,7 +376,7 @@ func TestApplyRequest(t *testing.T) { } for i, tt := range tests { - st := &storeRecorder{} + st := store.NewRecorder() srv := &EtcdServer{store: st} resp := srv.applyRequest(tt.req) @@ -394,7 +393,7 @@ func TestApplyRequest(t *testing.T) { func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { cl := newTestCluster([]*Member{{ID: 1}}) srv := &EtcdServer{ - store: &storeRecorder{}, + store: store.NewRecorder(), cluster: cl, } req := pb.Request{ @@ -452,7 +451,7 @@ func TestApplyConfChangeError(t *testing.T) { }, } for i, tt := range tests { - n := &nodeRecorder{} + n := newNodeRecorder() srv := &EtcdServer{ r: raftNode{Node: n}, cluster: cl, @@ -484,8 +483,8 @@ func TestApplyConfChangeShouldStop(t *testing.T) { srv := &EtcdServer{ id: 1, r: raftNode{ - Node: &nodeRecorder{}, - transport: &nopTransporter{}, + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), }, cluster: cl, } @@ -524,8 +523,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { srv := &EtcdServer{ id: 2, r: raftNode{ - Node: &nodeRecorder{}, - transport: &nopTransporter{}, + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), }, cluster: cl, w: wait.New(), @@ -558,14 +557,14 @@ func TestDoProposal(t *testing.T) { {Method: "GET", ID: 1, Quorum: true}, } for i, tt := range tests { - st := &storeRecorder{} + st := store.NewRecorder() srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeCommitter(), storage: &storageRecorder{}, raftStorage: raft.NewMemoryStorage(), - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -589,10 +588,10 @@ func TestDoProposal(t *testing.T) { } func TestDoProposalCancelled(t *testing.T) { - wait := &waitRecorder{} + wait := wait.NewRecorder() srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: &nodeRecorder{}}, + r: raftNode{Node: newNodeNop()}, w: wait, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -604,16 +603,16 @@ func TestDoProposalCancelled(t *testing.T) { t.Fatalf("err = %v, want %v", err, ErrCanceled) } w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}} - if !reflect.DeepEqual(wait.action, w) { - t.Errorf("wait.action = %+v, want %+v", wait.action, w) + if !reflect.DeepEqual(wait.Action(), w) { + t.Errorf("wait.action = %+v, want %+v", wait.Action(), w) } } func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: &nodeRecorder{}}, - w: &waitRecorder{}, + r: raftNode{Node: newNodeNop()}, + w: wait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } ctx, _ := context.WithTimeout(context.Background(), 0) @@ -626,8 +625,8 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: &nodeRecorder{}}, - w: &waitRecorder{}, + r: raftNode{Node: newNodeNop()}, + w: wait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.done = make(chan struct{}) @@ -640,7 +639,7 @@ func TestDoProposalStopped(t *testing.T) { // TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { - n := &nodeRecorder{} + n := newNodeRecorder() srv := &EtcdServer{ r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -680,7 +679,7 @@ func TestSync(t *testing.T) { // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request // after timeout func TestSyncTimeout(t *testing.T) { - n := &nodeProposalBlockerRecorder{} + n := newProposalBlockerRecorder() srv := &EtcdServer{ r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -717,10 +716,10 @@ func TestSyncTrigger(t *testing.T) { r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), storage: &storageRecorder{}, }, - store: &storeRecorder{}, + store: store.NewNop(), SyncTicker: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -757,12 +756,12 @@ func TestSyncTrigger(t *testing.T) { func TestSnapshot(t *testing.T) { s := raft.NewMemoryStorage() s.Append([]raftpb.Entry{{Index: 1}}) - st := &storeRecorder{} + st := store.NewRecorder() p := &storageRecorder{} srv := &EtcdServer{ cfg: &ServerConfig{}, r: raftNode{ - Node: &nodeRecorder{}, + Node: newNodeNop(), raftStorage: s, storage: p, }, @@ -792,7 +791,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { snapc := 10 - st := &storeRecorder{} + st := store.NewRecorder() p := &storageRecorder{} srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, @@ -801,7 +800,7 @@ func TestTriggerSnap(t *testing.T) { Node: newNodeCommitter(), raftStorage: raft.NewMemoryStorage(), storage: p, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -847,7 +846,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { } rs := raft.NewMemoryStorage() - tr := newSnapTransporter(testdir) + tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) s := &EtcdServer{ cfg: &ServerConfig{ V3demo: true, @@ -896,7 +895,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} // get the snapshot sent by the transport - snapMsg := <-tr.snapDoneC + snapMsg := <-snapDoneC // If the snapshot trails applied records, recovery will panic // since there's no allocated snapshot at the place of the // snapshot record. This only happens when the applier and the @@ -924,7 +923,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { // it should trigger storage.SaveSnap and also store.Recover. func TestRecvSnapshot(t *testing.T) { n := newReadyNode() - st := &storeRecorder{} + st := store.NewRecorder() p := &storageRecorder{} cl := newCluster("abc") cl.SetStore(store.New()) @@ -932,7 +931,7 @@ func TestRecvSnapshot(t *testing.T) { cfg: &ServerConfig{}, r: raftNode{ Node: n, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), storage: p, raftStorage: raft.NewMemoryStorage(), }, @@ -964,7 +963,7 @@ func TestRecvSnapshot(t *testing.T) { // first and then committed entries. func TestApplySnapshotAndCommittedEntries(t *testing.T) { n := newReadyNode() - st := &storeRecorder{} + st := store.NewRecorder() cl := newCluster("abc") cl.SetStore(store.New()) storage := raft.NewMemoryStorage() @@ -974,7 +973,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { Node: n, storage: &storageRecorder{}, raftStorage: storage, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, store: st, cluster: cl, @@ -1018,7 +1017,7 @@ func TestAddMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, cfg: &ServerConfig{}, store: st, @@ -1058,7 +1057,7 @@ func TestRemoveMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, cfg: &ServerConfig{}, store: st, @@ -1097,7 +1096,7 @@ func TestUpdateMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, - transport: &nopTransporter{}, + transport: rafthttp.NewNopTransporter(), }, store: st, cluster: cl, @@ -1124,11 +1123,11 @@ func TestUpdateMember(t *testing.T) { // TODO: test server could stop itself when being removed func TestPublish(t *testing.T) { - n := &nodeRecorder{} + n := newNodeRecorder() ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} - w := &waitWithResponse{ch: ch} + w := wait.NewWithResponse(ch) srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, id: 1, @@ -1173,11 +1172,11 @@ func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, r: raftNode{ - Node: &nodeRecorder{}, - transport: &nopTransporter{}, + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), }, cluster: &cluster{}, - w: &waitRecorder{}, + w: wait.NewNop(), done: make(chan struct{}), stop: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1188,11 +1187,11 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { - n := &nodeRecorder{} + n := newNodeRecorder() srv := &EtcdServer{ cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, - w: &waitRecorder{}, + w: wait.NewNop(), done: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1208,11 +1207,11 @@ func TestPublishRetry(t *testing.T) { } func TestUpdateVersion(t *testing.T) { - n := &nodeRecorder{} + n := newNodeRecorder() ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} - w := &waitWithResponse{ch: ch} + w := wait.NewWithResponse(ch) srv := &EtcdServer{ id: 1, cfg: &ServerConfig{TickMs: 1}, @@ -1312,166 +1311,11 @@ func TestGetOtherPeerURLs(t *testing.T) { } } -// storeRecorder records all the methods it receives. -// storeRecorder DOES NOT work as a actual store. -// It always returns invalid empty response and no error. -type storeRecorder struct{ testutil.Recorder } - -func (s *storeRecorder) Version() int { return 0 } -func (s *storeRecorder) Index() uint64 { return 0 } -func (s *storeRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "Get", - Params: []interface{}{path, recursive, sorted}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "Set", - Params: []interface{}{path, dir, val, expr}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) Update(path, val string, expr time.Time) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "Update", - Params: []interface{}{path, val, expr}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "Create", - Params: []interface{}{path, dir, val, uniq, exp}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "CompareAndSwap", - Params: []interface{}{path, prevVal, prevIdx, val, expr}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) Delete(path string, dir, recursive bool) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "Delete", - Params: []interface{}{path, dir, recursive}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*store.Event, error) { - s.Record(testutil.Action{ - Name: "CompareAndDelete", - Params: []interface{}{path, prevVal, prevIdx}, - }) - return &store.Event{}, nil -} -func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) { - s.Record(testutil.Action{Name: "Watch"}) - return &nopWatcher{}, nil -} -func (s *storeRecorder) Save() ([]byte, error) { - s.Record(testutil.Action{Name: "Save"}) - return nil, nil -} -func (s *storeRecorder) Recovery(b []byte) error { - s.Record(testutil.Action{Name: "Recovery"}) - return nil -} - -func (s *storeRecorder) SaveNoCopy() ([]byte, error) { - s.Record(testutil.Action{Name: "SaveNoCopy"}) - return nil, nil -} - -func (s *storeRecorder) Clone() store.Store { - s.Record(testutil.Action{Name: "Clone"}) - return s -} - -func (s *storeRecorder) JsonStats() []byte { return nil } -func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) { - s.Record(testutil.Action{ - Name: "DeleteExpiredKeys", - Params: []interface{}{cutoff}, - }) -} - -type nopWatcher struct{} - -func (w *nopWatcher) EventChan() chan *store.Event { return nil } -func (w *nopWatcher) StartIndex() uint64 { return 0 } -func (w *nopWatcher) Remove() {} - -// errStoreRecorder is a storeRecorder, but returns the given error on -// Get, Watch methods. -type errStoreRecorder struct { - storeRecorder - err error -} - -func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) { - s.storeRecorder.Get(path, recursive, sorted) - return nil, s.err -} -func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (store.Watcher, error) { - s.storeRecorder.Watch(path, recursive, sorted, index) - return nil, s.err -} - -type waitRecorder struct { - action []testutil.Action -} - -func (w *waitRecorder) Register(id uint64) <-chan interface{} { - w.action = append(w.action, testutil.Action{Name: "Register"}) - return nil -} -func (w *waitRecorder) Trigger(id uint64, x interface{}) { - w.action = append(w.action, testutil.Action{Name: "Trigger"}) -} - -type waitWithResponse struct { - ch <-chan interface{} -} - -func (w *waitWithResponse) Register(id uint64) <-chan interface{} { - return w.ch -} -func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} - -type storageRecorder struct { - testutil.Recorder - dbPath string // must have '/' suffix if set -} - -func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { - p.Record(testutil.Action{Name: "Save"}) - return nil -} - -func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { - if !raft.IsEmptySnap(st) { - p.Record(testutil.Action{Name: "SaveSnap"}) - } - return nil -} - -func (p *storageRecorder) DBFilePath(id uint64) (string, error) { - p.Record(testutil.Action{Name: "DBFilePath"}) - path := p.dbPath - if path != "" { - path = path + "/" - } - return fmt.Sprintf("%s%016x.snap.db", path, id), nil -} - -func (p *storageRecorder) Close() error { return nil } - type nodeRecorder struct{ testutil.Recorder } +func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} } +func newNodeNop() raft.Node { return newNodeRecorder() } + func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) } func (n *nodeRecorder) Campaign(ctx context.Context) error { n.Record(testutil.Action{Name: "Campaign"}) @@ -1513,21 +1357,34 @@ type nodeProposalBlockerRecorder struct { nodeRecorder } +func newProposalBlockerRecorder() *nodeProposalBlockerRecorder { + return &nodeProposalBlockerRecorder{*newNodeRecorder()} +} + func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error { <-ctx.Done() n.Record(testutil.Action{Name: "Propose blocked"}) return nil } -type nodeConfChangeCommitterRecorder struct { +// readyNode is a nodeRecorder with a user-writeable ready channel +type readyNode struct { nodeRecorder readyc chan raft.Ready - index uint64 +} + +func newReadyNode() *readyNode { + return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)} +} +func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } + +type nodeConfChangeCommitterRecorder struct { + readyNode + index uint64 } func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder { - readyc := make(chan raft.Ready, 1) - return &nodeConfChangeCommitterRecorder{readyc: readyc} + return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0} } func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { data, err := conf.Marshal() @@ -1549,14 +1406,12 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange // nodeCommitter commits proposed data immediately. type nodeCommitter struct { - nodeRecorder - readyc chan raft.Ready - index uint64 + readyNode + index uint64 } -func newNodeCommitter() *nodeCommitter { - readyc := make(chan raft.Ready, 1) - return &nodeCommitter{readyc: readyc} +func newNodeCommitter() raft.Node { + return &nodeCommitter{*newReadyNode(), 0} } func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { n.index++ @@ -1567,53 +1422,3 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { } return nil } -func (n *nodeCommitter) Ready() <-chan raft.Ready { - return n.readyc -} - -type readyNode struct { - nodeRecorder - readyc chan raft.Ready -} - -func newReadyNode() *readyNode { - readyc := make(chan raft.Ready, 1) - return &readyNode{readyc: readyc} -} -func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } - -type nopTransporter struct{} - -func (s *nopTransporter) Start() error { return nil } -func (s *nopTransporter) Handler() http.Handler { return nil } -func (s *nopTransporter) Send(m []raftpb.Message) {} -func (s *nopTransporter) SendSnapshot(m snap.Message) {} -func (s *nopTransporter) AddRemote(id types.ID, us []string) {} -func (s *nopTransporter) AddPeer(id types.ID, us []string) {} -func (s *nopTransporter) RemovePeer(id types.ID) {} -func (s *nopTransporter) RemoveAllPeers() {} -func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} -func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } -func (s *nopTransporter) Stop() {} -func (s *nopTransporter) Pause() {} -func (s *nopTransporter) Resume() {} - -type snapTransporter struct { - nopTransporter - snapDoneC chan snap.Message - snapDir string -} - -func newSnapTransporter(snapDir string) *snapTransporter { - return &snapTransporter{ - snapDoneC: make(chan snap.Message, 1), - snapDir: snapDir, - } -} - -func (s *snapTransporter) SendSnapshot(m snap.Message) { - ss := snap.New(s.snapDir) - ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) - m.CloseWithError(nil) - s.snapDoneC <- m -} diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 30891f06b..b1ed47b0e 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -15,13 +15,16 @@ package etcdserver import ( + "fmt" "io" "os" "path" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" @@ -144,3 +147,31 @@ func makeMemberDir(dir string) error { } return nil } + +type storageRecorder struct { + testutil.Recorder + dbPath string // must have '/' suffix if set +} + +func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { + p.Record(testutil.Action{Name: "Save"}) + return nil +} + +func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { + if !raft.IsEmptySnap(st) { + p.Record(testutil.Action{Name: "SaveSnap"}) + } + return nil +} + +func (p *storageRecorder) DBFilePath(id uint64) (string, error) { + p.Record(testutil.Action{Name: "DBFilePath"}) + path := p.dbPath + if path != "" { + path = path + "/" + } + return fmt.Sprintf("%s%016x.snap.db", path, id), nil +} + +func (p *storageRecorder) Close() error { return nil } diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index 141f24981..8957d3a6a 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -18,6 +18,8 @@ package wait import ( "sync" + + "github.com/coreos/etcd/pkg/testutil" ) type Wait interface { @@ -55,3 +57,39 @@ func (w *List) Trigger(id uint64, x interface{}) { close(ch) } } + +type WaitRecorder struct { + Wait + *testutil.Recorder +} + +type waitRecorder struct { + testutil.Recorder +} + +func NewRecorder() *WaitRecorder { + wr := &waitRecorder{} + return &WaitRecorder{Wait: wr, Recorder: &wr.Recorder} +} +func NewNop() Wait { return NewRecorder() } + +func (w *waitRecorder) Register(id uint64) <-chan interface{} { + w.Record(testutil.Action{Name: "Register"}) + return nil +} +func (w *waitRecorder) Trigger(id uint64, x interface{}) { + w.Record(testutil.Action{Name: "Trigger"}) +} + +type waitWithResponse struct { + ch <-chan interface{} +} + +func NewWithResponse(ch <-chan interface{}) Wait { + return &waitWithResponse{ch: ch} +} + +func (w *waitWithResponse) Register(id uint64) <-chan interface{} { + return w.ch +} +func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 5ae947c6c..25a7cf5ed 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -308,3 +308,42 @@ func (t *Transport) Resume() { p.(Pausable).Resume() } } + +type nopTransporter struct{} + +func NewNopTransporter() Transporter { + return &nopTransporter{} +} + +func (s *nopTransporter) Start() error { return nil } +func (s *nopTransporter) Handler() http.Handler { return nil } +func (s *nopTransporter) Send(m []raftpb.Message) {} +func (s *nopTransporter) SendSnapshot(m snap.Message) {} +func (s *nopTransporter) AddRemote(id types.ID, us []string) {} +func (s *nopTransporter) AddPeer(id types.ID, us []string) {} +func (s *nopTransporter) RemovePeer(id types.ID) {} +func (s *nopTransporter) RemoveAllPeers() {} +func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } +func (s *nopTransporter) Stop() {} +func (s *nopTransporter) Pause() {} +func (s *nopTransporter) Resume() {} + +type snapTransporter struct { + nopTransporter + snapDoneC chan snap.Message + snapDir string +} + +func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) { + ch := make(chan snap.Message, 1) + tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir} + return tr, ch +} + +func (s *snapTransporter) SendSnapshot(m snap.Message) { + ss := snap.New(s.snapDir) + ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) + m.CloseWithError(nil) + s.snapDoneC <- m +} diff --git a/store/store.go b/store/store.go index 94b91f582..15307b38d 100644 --- a/store/store.go +++ b/store/store.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" ) @@ -743,3 +744,126 @@ func (s *store) JsonStats() []byte { s.Stats.Watchers = uint64(s.WatcherHub.count) return s.Stats.toJson() } + +// StoreRecorder provides a Store interface with a testutil.Recorder +type StoreRecorder struct { + Store + *testutil.Recorder +} + +// storeRecorder records all the methods it receives. +// storeRecorder DOES NOT work as a actual store. +// It always returns invalid empty response and no error. +type storeRecorder struct { + Store + testutil.Recorder +} + +func NewNop() Store { return &storeRecorder{} } +func NewRecorder() *StoreRecorder { + sr := &storeRecorder{} + return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} +} + +func (s *storeRecorder) Version() int { return 0 } +func (s *storeRecorder) Index() uint64 { return 0 } +func (s *storeRecorder) Get(path string, recursive, sorted bool) (*Event, error) { + s.Record(testutil.Action{ + Name: "Get", + Params: []interface{}{path, recursive, sorted}, + }) + return &Event{}, nil +} +func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*Event, error) { + s.Record(testutil.Action{ + Name: "Set", + Params: []interface{}{path, dir, val, expr}, + }) + return &Event{}, nil +} +func (s *storeRecorder) Update(path, val string, expr time.Time) (*Event, error) { + s.Record(testutil.Action{ + Name: "Update", + Params: []interface{}{path, val, expr}, + }) + return &Event{}, nil +} +func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*Event, error) { + s.Record(testutil.Action{ + Name: "Create", + Params: []interface{}{path, dir, val, uniq, exp}, + }) + return &Event{}, nil +} +func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*Event, error) { + s.Record(testutil.Action{ + Name: "CompareAndSwap", + Params: []interface{}{path, prevVal, prevIdx, val, expr}, + }) + return &Event{}, nil +} +func (s *storeRecorder) Delete(path string, dir, recursive bool) (*Event, error) { + s.Record(testutil.Action{ + Name: "Delete", + Params: []interface{}{path, dir, recursive}, + }) + return &Event{}, nil +} +func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*Event, error) { + s.Record(testutil.Action{ + Name: "CompareAndDelete", + Params: []interface{}{path, prevVal, prevIdx}, + }) + return &Event{}, nil +} +func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (Watcher, error) { + s.Record(testutil.Action{Name: "Watch"}) + return NewNopWatcher(), nil +} +func (s *storeRecorder) Save() ([]byte, error) { + s.Record(testutil.Action{Name: "Save"}) + return nil, nil +} +func (s *storeRecorder) Recovery(b []byte) error { + s.Record(testutil.Action{Name: "Recovery"}) + return nil +} + +func (s *storeRecorder) SaveNoCopy() ([]byte, error) { + s.Record(testutil.Action{Name: "SaveNoCopy"}) + return nil, nil +} + +func (s *storeRecorder) Clone() Store { + s.Record(testutil.Action{Name: "Clone"}) + return s +} + +func (s *storeRecorder) JsonStats() []byte { return nil } +func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) { + s.Record(testutil.Action{ + Name: "DeleteExpiredKeys", + Params: []interface{}{cutoff}, + }) +} + +// errStoreRecorder is a storeRecorder, but returns the given error on +// Get, Watch methods. +type errStoreRecorder struct { + storeRecorder + err error +} + +func NewErrRecorder(err error) *StoreRecorder { + sr := &errStoreRecorder{err: err} + return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} +} + +func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) { + s.storeRecorder.Get(path, recursive, sorted) + return nil, s.err +} +func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (Watcher, error) { + s.storeRecorder.Watch(path, recursive, sorted, index) + return nil, s.err +} diff --git a/store/watcher.go b/store/watcher.go index 3c9f70ec4..252cdb1c6 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -85,3 +85,11 @@ func (w *watcher) Remove() { w.remove() } } + +// nopWatcher is a watcher that receives nothing, always blocking. +type nopWatcher struct{} + +func NewNopWatcher() Watcher { return &nopWatcher{} } +func (w *nopWatcher) EventChan() chan *Event { return nil } +func (w *nopWatcher) StartIndex() uint64 { return 0 } +func (w *nopWatcher) Remove() {} From 384cc762991d387b03be2d1716bb1c27df783818 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 4 Jan 2016 11:30:07 -0800 Subject: [PATCH 2/3] pkg/testutil: make Recorder an interface Provides two implementations of Recorder-- one that is non-blocking like the original version and one that provides a blocking channel to avoid busy waiting or racing in tests when no other synchronization is available. --- etcdserver/server_test.go | 2 +- etcdserver/storage.go | 2 +- pkg/testutil/recorder.go | 101 ++++++++++++++++++++++++++++++++++++-- pkg/wait/wait.go | 6 +-- storage/kvstore_test.go | 5 +- store/store.go | 8 +-- 6 files changed, 110 insertions(+), 14 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 4ab7a7c6e..2e4b85b6f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1313,7 +1313,7 @@ func TestGetOtherPeerURLs(t *testing.T) { type nodeRecorder struct{ testutil.Recorder } -func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} } +func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} } func newNodeNop() raft.Node { return newNodeRecorder() } func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index b1ed47b0e..b4cff7bfb 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -149,7 +149,7 @@ func makeMemberDir(dir string) error { } type storageRecorder struct { - testutil.Recorder + testutil.RecorderBuffered dbPath string // must have '/' suffix if set } diff --git a/pkg/testutil/recorder.go b/pkg/testutil/recorder.go index 0a49788a0..04b5357f2 100644 --- a/pkg/testutil/recorder.go +++ b/pkg/testutil/recorder.go @@ -14,27 +14,120 @@ package testutil -import "sync" +import ( + "errors" + "fmt" + "sync" + "time" +) type Action struct { Name string Params []interface{} } -type Recorder struct { +type Recorder interface { + // Record publishes an Action (e.g., function call) which will + // be reflected by Wait() or Chan() + Record(a Action) + // Wait waits until at least n Actions are availble or returns with error + Wait(n int) ([]Action, error) + // Action returns immediately available Actions + Action() []Action + // Chan returns the channel for actions published by Record + Chan() <-chan Action +} + +// RecorderBuffered appends all Actions to a slice +type RecorderBuffered struct { sync.Mutex actions []Action } -func (r *Recorder) Record(a Action) { +func (r *RecorderBuffered) Record(a Action) { r.Lock() r.actions = append(r.actions, a) r.Unlock() } -func (r *Recorder) Action() []Action { +func (r *RecorderBuffered) Action() []Action { r.Lock() cpy := make([]Action, len(r.actions)) copy(cpy, r.actions) r.Unlock() return cpy } +func (r *RecorderBuffered) Wait(n int) (acts []Action, err error) { + // legacy racey behavior + WaitSchedule() + acts = r.Action() + if len(acts) < n { + err = newLenErr(n, len(r.actions)) + } + return acts, err +} + +func (r *RecorderBuffered) Chan() <-chan Action { + ch := make(chan Action) + go func() { + acts := r.Action() + for i := range acts { + ch <- acts[i] + } + close(ch) + }() + return ch +} + +// RecorderStream writes all Actions to an unbuffered channel +type recorderStream struct { + ch chan Action +} + +func NewRecorderStream() Recorder { + return &recorderStream{ch: make(chan Action)} +} + +func (r *recorderStream) Record(a Action) { + r.ch <- a +} + +func (r *recorderStream) Action() (acts []Action) { + for { + select { + case act := <-r.ch: + acts = append(acts, act) + default: + return acts + } + } + return acts +} + +func (r *recorderStream) Chan() <-chan Action { + return r.ch +} + +func (r *recorderStream) Wait(n int) ([]Action, error) { + acts := make([]Action, n) + timeoutC := time.After(5 * time.Second) + for i := 0; i < n; i++ { + select { + case acts[i] = <-r.ch: + case <-timeoutC: + acts = acts[:i] + return acts, newLenErr(n, i) + } + } + // extra wait to catch any Action spew + select { + case act := <-r.ch: + acts = append(acts, act) + case <-time.After(10 * time.Millisecond): + } + return acts, nil +} + +func newLenErr(expected int, actual int) error { + s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected) + return errors.New(s) +} diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index 8957d3a6a..fc343af3e 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -60,16 +60,16 @@ func (w *List) Trigger(id uint64, x interface{}) { type WaitRecorder struct { Wait - *testutil.Recorder + testutil.Recorder } type waitRecorder struct { - testutil.Recorder + testutil.RecorderBuffered } func NewRecorder() *WaitRecorder { wr := &waitRecorder{} - return &WaitRecorder{Wait: wr, Recorder: &wr.Recorder} + return &WaitRecorder{Wait: wr, Recorder: wr} } func NewNop() Wait { return NewRecorder() } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 4700d2707..ab7ef9f10 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -473,8 +473,11 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { } func newFakeStore() *store { - b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} + b := &fakeBackend{&fakeBatchTx{ + Recorder: &testutil.RecorderBuffered{}, + rangeRespc: make(chan rangeResp, 5)}} fi := &fakeIndex{ + Recorder: &testutil.RecorderBuffered{}, indexGetRespc: make(chan indexGetResp, 1), indexRangeRespc: make(chan indexRangeResp, 1), indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), diff --git a/store/store.go b/store/store.go index 15307b38d..2d4b6c40a 100644 --- a/store/store.go +++ b/store/store.go @@ -748,7 +748,7 @@ func (s *store) JsonStats() []byte { // StoreRecorder provides a Store interface with a testutil.Recorder type StoreRecorder struct { Store - *testutil.Recorder + testutil.Recorder } // storeRecorder records all the methods it receives. @@ -756,13 +756,13 @@ type StoreRecorder struct { // It always returns invalid empty response and no error. type storeRecorder struct { Store - testutil.Recorder + testutil.RecorderBuffered } func NewNop() Store { return &storeRecorder{} } func NewRecorder() *StoreRecorder { sr := &storeRecorder{} - return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} + return &StoreRecorder{Store: sr, Recorder: sr} } func (s *storeRecorder) Version() int { return 0 } @@ -856,7 +856,7 @@ type errStoreRecorder struct { func NewErrRecorder(err error) *StoreRecorder { sr := &errStoreRecorder{err: err} - return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} + return &StoreRecorder{Store: sr, Recorder: sr} } func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) { From 838328b0570a4cf5874143ff7719639b11b813ad Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 4 Jan 2016 11:39:02 -0800 Subject: [PATCH 3/3] etcdserver: fix racey WaitSchedule() tests to wait for recorder actions Fixes #4119 --- etcdserver/raft_test.go | 2 +- etcdserver/server_test.go | 73 +++++++++++++++++++++------------------ 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 77366715c..d3f5cbde5 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -150,7 +150,7 @@ func TestCreateConfigChangeEnts(t *testing.T) { } func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { - n := newReadyNode() + n := newNopReadyNode() r := raftNode{ Node: n, storage: &storageRecorder{}, diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 2e4b85b6f..a6743cc16 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -468,7 +468,7 @@ func TestApplyConfChangeError(t *testing.T) { Params: []interface{}{cc}, }, } - if g := n.Action(); !reflect.DeepEqual(g, w) { + if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) { t.Errorf("#%d: action = %+v, want %+v", i, g, w) } } @@ -657,9 +657,7 @@ func TestSync(t *testing.T) { t.Fatal("sync should be non-blocking but did not return after 1s!") } - testutil.WaitSchedule() - - action := n.Action() + action, _ := n.Wait(1) if len(action) != 1 { t.Fatalf("len(action) = %d, want 1", len(action)) } @@ -697,10 +695,8 @@ func TestSyncTimeout(t *testing.T) { t.Fatal("sync should be non-blocking but did not return after 1s!") } - // give time for goroutine in sync to cancel - testutil.WaitSchedule() w := []testutil.Action{{Name: "Propose blocked"}} - if g := n.Action(); !reflect.DeepEqual(g, w) { + if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) { t.Errorf("action = %v, want %v", g, w) } } @@ -723,19 +719,22 @@ func TestSyncTrigger(t *testing.T) { SyncTicker: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.start() - defer srv.Stop() - // trigger the server to become a leader and accept sync requests - n.readyc <- raft.Ready{ - SoftState: &raft.SoftState{ - RaftState: raft.StateLeader, - }, - } - // trigger a sync request - st <- time.Time{} - testutil.WaitSchedule() - action := n.Action() + // trigger the server to become a leader and accept sync requests + go func() { + srv.start() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{ + RaftState: raft.StateLeader, + }, + } + // trigger a sync request + st <- time.Time{} + }() + + action, _ := n.Wait(1) + go srv.Stop() + if len(action) != 1 { t.Fatalf("len(action) = %d, want 1", len(action)) } @@ -750,6 +749,9 @@ func TestSyncTrigger(t *testing.T) { if req.Method != "SYNC" { t.Fatalf("unexpected proposed request: %#v", req.Method) } + + // wait on stop message + <-n.Chan() } // snapshot should snapshot the store and cut the persistent @@ -768,8 +770,7 @@ func TestSnapshot(t *testing.T) { store: st, } srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) - testutil.WaitSchedule() - gaction := st.Action() + gaction, _ := st.Wait(2) if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 1", len(gaction)) } @@ -809,14 +810,14 @@ func TestTriggerSnap(t *testing.T) { for i := 0; i < snapc+1; i++ { srv.Do(context.Background(), pb.Request{Method: "PUT"}) } - srv.Stop() - // wait for snapshot goroutine to finish - testutil.WaitSchedule() - gaction := p.Action() + wcnt := 2 + snapc + gaction, _ := p.Wait(wcnt) + + srv.Stop() + // each operation is recorded as a Save // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap - wcnt := 2 + snapc if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } @@ -832,7 +833,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { // snapshots that may queue up at once without dropping maxInFlightMsgSnap = 16 ) - n := newReadyNode() + n := newNopReadyNode() cl := newCluster("abc") cl.SetStore(store.New()) @@ -922,7 +923,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { // TestRecvSnapshot tests when it receives a snapshot from raft leader, // it should trigger storage.SaveSnap and also store.Recover. func TestRecvSnapshot(t *testing.T) { - n := newReadyNode() + n := newNopReadyNode() st := store.NewRecorder() p := &storageRecorder{} cl := newCluster("abc") @@ -962,7 +963,7 @@ func TestRecvSnapshot(t *testing.T) { // TestApplySnapshotAndCommittedEntries tests that server applies snapshot // first and then committed entries. func TestApplySnapshotAndCommittedEntries(t *testing.T) { - n := newReadyNode() + n := newNopReadyNode() st := store.NewRecorder() cl := newCluster("abc") cl.SetStore(store.New()) @@ -988,10 +989,9 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { }, } // make goroutines move forward to receive snapshot - testutil.WaitSchedule() + actions, _ := st.Wait(2) s.Stop() - actions := st.Action() if len(actions) != 2 { t.Fatalf("len(action) = %d, want 2", len(actions)) } @@ -1374,8 +1374,14 @@ type readyNode struct { } func newReadyNode() *readyNode { + return &readyNode{ + nodeRecorder{testutil.NewRecorderStream()}, + make(chan raft.Ready, 1)} +} +func newNopReadyNode() *readyNode { return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)} } + func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } type nodeConfChangeCommitterRecorder struct { @@ -1384,8 +1390,9 @@ type nodeConfChangeCommitterRecorder struct { } func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder { - return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0} + return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0} } + func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { data, err := conf.Marshal() if err != nil { @@ -1411,7 +1418,7 @@ type nodeCommitter struct { } func newNodeCommitter() raft.Node { - return &nodeCommitter{*newReadyNode(), 0} + return &nodeCommitter{*newNopReadyNode(), 0} } func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { n.index++