From 3748088b96ecd59677655fc0c8f3f7d12eb8bda8 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 29 Dec 2014 14:38:00 -0800 Subject: [PATCH 1/3] etcdserver: print out log of normal tests --- etcdserver/server_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 9e6031d20..f057e0c92 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -23,6 +23,7 @@ import ( "log" "math/rand" "net/http" + "os" "path" "reflect" "strconv" @@ -41,10 +42,6 @@ import ( "github.com/coreos/etcd/store" ) -func init() { - log.SetOutput(ioutil.Discard) -} - // TestDoLocalAction tests requests which do not need to go through raft to be applied, // and are served through local data. func TestDoLocalAction(t *testing.T) { @@ -530,6 +527,9 @@ func (s *fakeTransporter) Pause() {} func (s *fakeTransporter) Resume() {} func testServer(t *testing.T, ns uint64) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stderr) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1227,6 +1227,8 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stderr) n := &nodeRecorder{} srv := &EtcdServer{ node: n, From a92bd1d16566be4d75cf5b04346d87269dfd8b37 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 30 Dec 2014 00:10:18 -0800 Subject: [PATCH 2/3] etcdserver: add multi_server_test.go --- etcdserver/multi_server_test.go | 142 ++++++++++++++++++++++++++++++++ etcdserver/server_test.go | 115 -------------------------- 2 files changed, 142 insertions(+), 115 deletions(-) create mode 100644 etcdserver/multi_server_test.go diff --git a/etcdserver/multi_server_test.go b/etcdserver/multi_server_test.go new file mode 100644 index 000000000..3f311cd67 --- /dev/null +++ b/etcdserver/multi_server_test.go @@ -0,0 +1,142 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "encoding/json" + "io/ioutil" + "log" + "math/rand" + "os" + "reflect" + "testing" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/idutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" +) + +func TestClusterOf1(t *testing.T) { testServer(t, 1) } +func TestClusterOf3(t *testing.T) { testServer(t, 3) } + +func testServer(t *testing.T, ns uint64) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stderr) + + ss := make([]*EtcdServer, ns) + + ids := make([]uint64, ns) + for i := uint64(0); i < ns; i++ { + ids[i] = i + 1 + } + members := mustMakePeerSlice(t, ids...) + for i := uint64(0); i < ns; i++ { + id := i + 1 + s := raft.NewMemoryStorage() + n := raft.StartNode(id, members, 10, 1, s) + tk := time.NewTicker(10 * time.Millisecond) + defer tk.Stop() + st := store.New() + cl := newCluster("abc") + cl.SetStore(st) + srv := &EtcdServer{ + node: n, + raftStorage: s, + store: st, + transport: &fakeTransporter{ss: ss}, + storage: &storageRecorder{}, + Ticker: tk.C, + Cluster: cl, + reqIDGen: idutil.NewGenerator(uint8(i), time.Time{}), + } + ss[i] = srv + } + + for i := uint64(0); i < ns; i++ { + ss[i].start() + } + + for i := 1; i <= 10; i++ { + r := pb.Request{ + Method: "PUT", + Path: "/foo", + Val: "bar", + } + j := rand.Intn(len(ss)) + t.Logf("ss = %d", j) + resp, err := ss[j].Do(context.Background(), r) + if err != nil { + t.Fatal(err) + } + + g, w := resp.Event.Node, &store.NodeExtern{ + Key: "/foo", + ModifiedIndex: uint64(i) + ns, + CreatedIndex: uint64(i) + ns, + Value: stringp("bar"), + } + + if !reflect.DeepEqual(g, w) { + t.Error("value:", *g.Value) + t.Errorf("g = %+v, w %+v", g, w) + } + } + + time.Sleep(10 * time.Millisecond) + + var last interface{} + for i, sv := range ss { + sv.Stop() + g, _ := sv.store.Get("/", true, true) + if last != nil && !reflect.DeepEqual(last, g) { + t.Errorf("server %d: Root = %#v, want %#v", i, g, last) + } + last = g + } +} + +// TODO: test wait trigger correctness in multi-server case + +type fakeTransporter struct { + nopTransporter + ss []*EtcdServer +} + +func (s *fakeTransporter) Send(msgs []raftpb.Message) { + for _, m := range msgs { + s.ss[m.To-1].node.Step(context.TODO(), m) + } +} + +func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { + m := Member{ID: types.ID(id)} + b, err := json.Marshal(m) + if err != nil { + t.Fatal(err) + } + peers[i] = raft.Peer{ID: id, Context: b} + } + return peers +} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index f057e0c92..680ac6134 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -21,7 +21,6 @@ import ( "fmt" "io/ioutil" "log" - "math/rand" "net/http" "os" "path" @@ -505,107 +504,6 @@ func TestApplyConfChangeShouldStop(t *testing.T) { } } -func TestClusterOf1(t *testing.T) { testServer(t, 1) } -func TestClusterOf3(t *testing.T) { testServer(t, 3) } - -type fakeTransporter struct { - ss []*EtcdServer -} - -func (s *fakeTransporter) Handler() http.Handler { return nil } -func (s *fakeTransporter) Send(msgs []raftpb.Message) { - for _, m := range msgs { - s.ss[m.To-1].node.Step(context.TODO(), m) - } -} -func (s *fakeTransporter) AddPeer(id types.ID, us []string) {} -func (s *fakeTransporter) UpdatePeer(id types.ID, us []string) {} -func (s *fakeTransporter) RemovePeer(id types.ID) {} -func (s *fakeTransporter) Stop() {} -func (s *fakeTransporter) ShouldStopNotify() <-chan struct{} { return nil } -func (s *fakeTransporter) Pause() {} -func (s *fakeTransporter) Resume() {} - -func testServer(t *testing.T, ns uint64) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ss := make([]*EtcdServer, ns) - - ids := make([]uint64, ns) - for i := uint64(0); i < ns; i++ { - ids[i] = i + 1 - } - members := mustMakePeerSlice(t, ids...) - for i := uint64(0); i < ns; i++ { - id := i + 1 - s := raft.NewMemoryStorage() - n := raft.StartNode(id, members, 10, 1, s) - tk := time.NewTicker(10 * time.Millisecond) - defer tk.Stop() - st := store.New() - cl := newCluster("abc") - cl.SetStore(st) - srv := &EtcdServer{ - node: n, - raftStorage: s, - store: st, - transport: &fakeTransporter{ss}, - storage: &storageRecorder{}, - Ticker: tk.C, - Cluster: cl, - reqIDGen: idutil.NewGenerator(uint8(i), time.Time{}), - } - ss[i] = srv - } - - // Start the servers after they're all created to avoid races in send(). - for i := uint64(0); i < ns; i++ { - ss[i].start() - } - - for i := 1; i <= 10; i++ { - r := pb.Request{ - Method: "PUT", - Path: "/foo", - Val: "bar", - } - j := rand.Intn(len(ss)) - t.Logf("ss = %d", j) - resp, err := ss[j].Do(ctx, r) - if err != nil { - t.Fatal(err) - } - - g, w := resp.Event.Node, &store.NodeExtern{ - Key: "/foo", - ModifiedIndex: uint64(i) + ns, - CreatedIndex: uint64(i) + ns, - Value: stringp("bar"), - } - - if !reflect.DeepEqual(g, w) { - t.Error("value:", *g.Value) - t.Errorf("g = %+v, w %+v", g, w) - } - } - - time.Sleep(10 * time.Millisecond) - - var last interface{} - for i, sv := range ss { - sv.Stop() - g, _ := sv.store.Get("/", true, true) - if last != nil && !reflect.DeepEqual(last, g) { - t.Errorf("server %d: Root = %#v, want %#v", i, g, last) - } - last = g - } -} - func TestDoProposal(t *testing.T) { tests := []pb.Request{ pb.Request{Method: "POST", ID: 1}, @@ -1621,16 +1519,3 @@ func (s *nopTransporter) Stop() {} func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil } func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} - -func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { - peers := make([]raft.Peer, len(ids)) - for i, id := range ids { - m := Member{ID: types.ID(id)} - b, err := json.Marshal(m) - if err != nil { - t.Fatal(err) - } - peers[i] = raft.Peer{ID: id, Context: b} - } - return peers -} From 241a474935f50677cd24a3cd7dfad3bf34b9b53b Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 30 Dec 2014 00:16:31 -0800 Subject: [PATCH 3/3] etcdserver: refactor server tests 1. remove redundant fake struct 2. use fake node for better testing 3. code clean --- etcdserver/server_test.go | 326 +++++++++++++------------------------- 1 file changed, 109 insertions(+), 217 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 680ac6134..97bf49593 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -403,7 +403,6 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { } } -// TODO: test ErrIDRemoved func TestApplyConfChangeError(t *testing.T) { cl := newCluster("") cl.SetStore(store.New()) @@ -511,29 +510,18 @@ func TestDoProposal(t *testing.T) { pb.Request{Method: "DELETE", ID: 1}, pb.Request{Method: "GET", ID: 1, Quorum: true}, } - for i, tt := range tests { - ctx, _ := context.WithCancel(context.Background()) - s := raft.NewMemoryStorage() - n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) st := &storeRecorder{} - tk := make(chan time.Time) - // this makes <-tk always successful, which accelerates internal clock - close(tk) - cl := newCluster("abc") - cl.SetStore(store.New()) srv := &EtcdServer{ - node: n, - raftStorage: s, + node: newNodeCommitter(), + raftStorage: raft.NewMemoryStorage(), store: st, transport: &nopTransporter{}, storage: &storageRecorder{}, - Ticker: tk, - Cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() - resp, err := srv.Do(ctx, tt) + resp, err := srv.Do(context.Background(), tt) srv.Stop() action := st.Action() @@ -551,34 +539,16 @@ func TestDoProposal(t *testing.T) { } func TestDoProposalCancelled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - // node cannot make any progress because there are two nodes - s := raft.NewMemoryStorage() - n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s) - st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ - // TODO: use fake node for better testability - node: n, - raftStorage: s, - store: st, - w: wait, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + node: &nodeRecorder{}, + w: wait, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } - - done := make(chan struct{}) - var err error - go func() { - _, err = srv.Do(ctx, pb.Request{Method: "PUT"}) - close(done) - }() + ctx, cancel := context.WithCancel(context.Background()) cancel() - <-done + _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) - gaction := st.Action() - if len(gaction) != 0 { - t.Errorf("len(action) = %v, want 0", len(gaction)) - } if err != ErrCanceled { t.Fatalf("err = %v, want %v", err, ErrCanceled) } @@ -589,88 +559,57 @@ func TestDoProposalCancelled(t *testing.T) { } func TestDoProposalTimeout(t *testing.T) { - ctx, _ := context.WithTimeout(context.Background(), 0) srv := &EtcdServer{ node: &nodeRecorder{}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - _, err := srv.Do(ctx, pb.Request{Method: "PUT", ID: 1}) + ctx, _ := context.WithTimeout(context.Background(), 0) + _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) if err != ErrTimeout { t.Fatalf("err = %v, want %v", err, ErrTimeout) } } func TestDoProposalStopped(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // node cannot make any progress because there are two nodes - s := raft.NewMemoryStorage() - n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s) - st := &storeRecorder{} - tk := make(chan time.Time) - // this makes <-tk always successful, which accelarates internal clock - close(tk) - cl := newCluster("abc") - cl.SetStore(store.New()) srv := &EtcdServer{ - // TODO: use fake node for better testability - node: n, - raftStorage: s, - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - Ticker: tk, - Cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.start() - - done := make(chan struct{}) - var err error - go func() { - _, err = srv.Do(ctx, pb.Request{Method: "PUT", ID: 1}) - close(done) - }() - srv.Stop() - <-done - - action := st.Action() - if len(action) != 0 { - t.Errorf("len(action) = %v, want 0", len(action)) + node: &nodeRecorder{}, + w: &waitRecorder{}, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.done = make(chan struct{}) + close(srv.done) + _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1}) if err != ErrStopped { t.Errorf("err = %v, want %v", err, ErrStopped) } } -// TestSync tests sync 1. is nonblocking 2. sends out SYNC request. +// TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { - n := &nodeProposeDataRecorder{} + n := &nodeRecorder{} srv := &EtcdServer{ node: n, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - done := make(chan struct{}) - go func() { - srv.sync(10 * time.Second) - close(done) - }() - // check that sync is non-blocking - select { - case <-done: - case <-time.After(time.Second): + timer := time.AfterFunc(time.Second, func() { t.Fatalf("sync should be non-blocking but did not return after 1s!") - } - + }) + srv.sync(10 * time.Second) + timer.Stop() testutil.ForceGosched() - data := n.data() - if len(data) != 1 { - t.Fatalf("len(proposeData) = %d, want 1", len(data)) + + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) } + if action[0].name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].name) + } + data := action[0].params[0].([]byte) var r pb.Request - if err := r.Unmarshal(data[0]); err != nil { + if err := r.Unmarshal(data); err != nil { t.Fatalf("unmarshal request error: %v", err) } if r.Method != "SYNC" { @@ -686,21 +625,14 @@ func TestSyncTimeout(t *testing.T) { node: n, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - done := make(chan struct{}) - go func() { - srv.sync(0) - close(done) - }() - // check that sync is non-blocking - select { - case <-done: - case <-time.After(time.Second): + timer := time.AfterFunc(time.Second, func() { t.Fatalf("sync should be non-blocking but did not return after 1s!") - } + }) + srv.sync(0) + timer.Stop() // give time for goroutine in sync to cancel - // TODO: use fake clock testutil.ForceGosched() w := []action{action{name: "Propose blocked"}} if g := n.Action(); !reflect.DeepEqual(g, w) { @@ -710,24 +642,9 @@ func TestSyncTimeout(t *testing.T) { // TODO: TestNoSyncWhenNoLeader -// blockingNodeProposer implements the node interface to allow users to -// block until Propose has been called and then verify the Proposed data -type blockingNodeProposer struct { - ch chan []byte - readyNode -} - -func (n *blockingNodeProposer) Propose(_ context.Context, data []byte) error { - n.ch <- data - return nil -} - // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks func TestSyncTrigger(t *testing.T) { - n := &blockingNodeProposer{ - ch: make(chan []byte), - readyNode: *newReadyNode(), - } + n := newReadyNode() st := make(chan time.Time, 1) srv := &EtcdServer{ node: n, @@ -739,6 +656,7 @@ func TestSyncTrigger(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() + defer srv.Stop() // trigger the server to become a leader and accept sync requests n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ @@ -747,13 +665,16 @@ func TestSyncTrigger(t *testing.T) { } // trigger a sync request st <- time.Time{} - var data []byte - select { - case <-time.After(time.Second): - t.Fatalf("did not receive proposed request as expected!") - case data = <-n.ch: + testutil.ForceGosched() + + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) } - srv.Stop() + if action[0].name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].name) + } + data := action[0].params[0].([]byte) var req pb.Request if err := req.Unmarshal(data); err != nil { t.Fatalf("error unmarshalling data: %v", err) @@ -764,29 +685,17 @@ func TestSyncTrigger(t *testing.T) { } // snapshot should snapshot the store and cut the persistent -// TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { s := raft.NewMemoryStorage() - n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) - defer n.Stop() - - // Now we can have an election and persist the rest of the log. - // This causes HardState.Commit to advance. HardState.Commit must - // be > 0 to snapshot. - n.Campaign(context.Background()) - rd := <-n.Ready() - s.Append(rd.Entries) - n.Advance() - + s.Append([]raftpb.Entry{{Index: 1}}) st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + node: &nodeRecorder{}, + raftStorage: s, store: st, storage: p, - node: n, - raftStorage: s, } - srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}}) gaction := st.Action() if len(gaction) != 1 { @@ -795,7 +704,6 @@ func TestSnapshot(t *testing.T) { if !reflect.DeepEqual(gaction[0], action{name: "Save"}) { t.Errorf("action = %s, want Save", gaction[0]) } - gaction = p.Action() if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 2", len(gaction)) @@ -810,39 +718,28 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { - ctx := context.Background() - s := raft.NewMemoryStorage() - n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) - n.Campaign(ctx) + snapc := 10 st := &storeRecorder{} p := &storageRecorder{} - cl := newCluster("abc") - cl.SetStore(store.New()) srv := &EtcdServer{ + node: newNodeCommitter(), + raftStorage: raft.NewMemoryStorage(), store: st, transport: &nopTransporter{}, storage: p, - node: n, - raftStorage: s, - snapCount: 10, - Cluster: cl, + snapCount: uint64(snapc), reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.start() - // wait for saving nop - time.Sleep(time.Millisecond) - for i := 0; uint64(i) < srv.snapCount-1; i++ { - srv.Do(ctx, pb.Request{Method: "PUT", ID: 1}) + for i := 0; i < snapc+1; i++ { + srv.Do(context.Background(), pb.Request{Method: "PUT"}) } - // wait for saving the last entry - time.Sleep(time.Millisecond) srv.Stop() gaction := p.Action() // each operation is recorded as a Save - // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap - wcnt := 2 + int(srv.snapCount) + // (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + Cut + SaveSnap + wcnt := 3 + snapc if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } @@ -1066,10 +963,8 @@ func TestUpdateMember(t *testing.T) { // TODO: test server could stop itself when being removed -// TODO: test wait trigger correctness in multi-server case - func TestPublish(t *testing.T) { - n := &nodeProposeDataRecorder{} + n := &nodeRecorder{} ch := make(chan interface{}, 1) // simulate that request has gone through consensus ch <- Response{} @@ -1084,12 +979,16 @@ func TestPublish(t *testing.T) { } srv.publish(time.Hour) - data := n.data() - if len(data) != 1 { - t.Fatalf("len(proposeData) = %d, want 1", len(data)) + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) } + if action[0].name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].name) + } + data := action[0].params[0].([]byte) var r pb.Request - if err := r.Unmarshal(data[0]); err != nil { + if err := r.Unmarshal(data); err != nil { t.Fatalf("unmarshal request error: %v", err) } if r.Method != "PUT" { @@ -1230,6 +1129,10 @@ func TestGetBool(t *testing.T) { } } +func boolp(b bool) *bool { return &b } + +func stringp(s string) *string { return &s } + type action struct { name string params []interface{} @@ -1361,9 +1264,14 @@ func (w *waitRecorder) Trigger(id uint64, x interface{}) { w.action = append(w.action, action{name: "Trigger"}) } -func boolp(b bool) *bool { return &b } +type waitWithResponse struct { + ch <-chan interface{} +} -func stringp(s string) *string { return &s } +func (w *waitWithResponse) Register(id uint64) <-chan interface{} { + return w.ch +} +func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} type storageRecorder struct { recorder @@ -1385,39 +1293,17 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { } func (p *storageRecorder) Close() error { return nil } -type readyNode struct { - readyc chan raft.Ready -} - -func newReadyNode() *readyNode { - readyc := make(chan raft.Ready, 1) - return &readyNode{readyc: readyc} -} -func (n *readyNode) Tick() {} -func (n *readyNode) Campaign(ctx context.Context) error { return nil } -func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { - return nil -} -func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } -func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } -func (n *readyNode) Advance() {} -func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { return nil } -func (n *readyNode) Stop() {} -func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {} - type nodeRecorder struct { recorder } func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) } - func (n *nodeRecorder) Campaign(ctx context.Context) error { n.record(action{name: "Campaign"}) return nil } func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { - n.record(action{name: "Propose"}) + n.record(action{name: "Propose", params: []interface{}{data}}) return nil } func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { @@ -1441,26 +1327,6 @@ func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) { n.record(action{name: "Compact"}) } -type nodeProposeDataRecorder struct { - nodeRecorder - sync.Mutex - d [][]byte -} - -func (n *nodeProposeDataRecorder) data() [][]byte { - n.Lock() - d := n.d - n.Unlock() - return d -} -func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error { - n.nodeRecorder.Propose(ctx, data) - n.Lock() - n.d = append(n.d, data) - n.Unlock() - return nil -} - type nodeProposalBlockerRecorder struct { nodeRecorder } @@ -1499,14 +1365,40 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange return &raftpb.ConfState{} } -type waitWithResponse struct { - ch <-chan interface{} +// nodeCommitter commits proposed data immediately. +type nodeCommitter struct { + nodeRecorder + readyc chan raft.Ready + index uint64 } -func (w *waitWithResponse) Register(id uint64) <-chan interface{} { - return w.ch +func newNodeCommitter() *nodeCommitter { + readyc := make(chan raft.Ready, 1) + return &nodeCommitter{readyc: readyc} } -func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} +func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { + n.index++ + ents := []raftpb.Entry{{Index: n.index, Data: data}} + n.readyc <- raft.Ready{ + Entries: ents, + CommittedEntries: ents, + } + return nil +} +func (n *nodeCommitter) Ready() <-chan raft.Ready { + return n.readyc +} + +type readyNode struct { + nodeRecorder + readyc chan raft.Ready +} + +func newReadyNode() *readyNode { + readyc := make(chan raft.Ready, 1) + return &readyNode{readyc: readyc} +} +func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } type nopTransporter struct{}