Merge pull request #2015 from yichengq/265

etcdserver: cleanup server tests
This commit is contained in:
Yicheng Qin 2014-12-30 13:52:58 -08:00
commit 8088440e1d
2 changed files with 254 additions and 333 deletions

View File

@ -0,0 +1,142 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"encoding/json"
"io/ioutil"
"log"
"math/rand"
"os"
"reflect"
"testing"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/idutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
func testServer(t *testing.T, ns uint64) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
ss := make([]*EtcdServer, ns)
ids := make([]uint64, ns)
for i := uint64(0); i < ns; i++ {
ids[i] = i + 1
}
members := mustMakePeerSlice(t, ids...)
for i := uint64(0); i < ns; i++ {
id := i + 1
s := raft.NewMemoryStorage()
n := raft.StartNode(id, members, 10, 1, s)
tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()
st := store.New()
cl := newCluster("abc")
cl.SetStore(st)
srv := &EtcdServer{
node: n,
raftStorage: s,
store: st,
transport: &fakeTransporter{ss: ss},
storage: &storageRecorder{},
Ticker: tk.C,
Cluster: cl,
reqIDGen: idutil.NewGenerator(uint8(i), time.Time{}),
}
ss[i] = srv
}
for i := uint64(0); i < ns; i++ {
ss[i].start()
}
for i := 1; i <= 10; i++ {
r := pb.Request{
Method: "PUT",
Path: "/foo",
Val: "bar",
}
j := rand.Intn(len(ss))
t.Logf("ss = %d", j)
resp, err := ss[j].Do(context.Background(), r)
if err != nil {
t.Fatal(err)
}
g, w := resp.Event.Node, &store.NodeExtern{
Key: "/foo",
ModifiedIndex: uint64(i) + ns,
CreatedIndex: uint64(i) + ns,
Value: stringp("bar"),
}
if !reflect.DeepEqual(g, w) {
t.Error("value:", *g.Value)
t.Errorf("g = %+v, w %+v", g, w)
}
}
time.Sleep(10 * time.Millisecond)
var last interface{}
for i, sv := range ss {
sv.Stop()
g, _ := sv.store.Get("/", true, true)
if last != nil && !reflect.DeepEqual(last, g) {
t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
}
last = g
}
}
// TODO: test wait trigger correctness in multi-server case
type fakeTransporter struct {
nopTransporter
ss []*EtcdServer
}
func (s *fakeTransporter) Send(msgs []raftpb.Message) {
for _, m := range msgs {
s.ss[m.To-1].node.Step(context.TODO(), m)
}
}
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
m := Member{ID: types.ID(id)}
b, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
peers[i] = raft.Peer{ID: id, Context: b}
}
return peers
}

View File

@ -21,8 +21,8 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"path"
"reflect"
"strconv"
@ -41,10 +41,6 @@ import (
"github.com/coreos/etcd/store"
)
func init() {
log.SetOutput(ioutil.Discard)
}
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
// and are served through local data.
func TestDoLocalAction(t *testing.T) {
@ -407,7 +403,6 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
}
}
// TODO: test ErrIDRemoved
func TestApplyConfChangeError(t *testing.T) {
cl := newCluster("")
cl.SetStore(store.New())
@ -508,104 +503,6 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
}
}
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
type fakeTransporter struct {
ss []*EtcdServer
}
func (s *fakeTransporter) Handler() http.Handler { return nil }
func (s *fakeTransporter) Send(msgs []raftpb.Message) {
for _, m := range msgs {
s.ss[m.To-1].node.Step(context.TODO(), m)
}
}
func (s *fakeTransporter) AddPeer(id types.ID, us []string) {}
func (s *fakeTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *fakeTransporter) RemovePeer(id types.ID) {}
func (s *fakeTransporter) Stop() {}
func (s *fakeTransporter) ShouldStopNotify() <-chan struct{} { return nil }
func (s *fakeTransporter) Pause() {}
func (s *fakeTransporter) Resume() {}
func testServer(t *testing.T, ns uint64) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ss := make([]*EtcdServer, ns)
ids := make([]uint64, ns)
for i := uint64(0); i < ns; i++ {
ids[i] = i + 1
}
members := mustMakePeerSlice(t, ids...)
for i := uint64(0); i < ns; i++ {
id := i + 1
s := raft.NewMemoryStorage()
n := raft.StartNode(id, members, 10, 1, s)
tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()
st := store.New()
cl := newCluster("abc")
cl.SetStore(st)
srv := &EtcdServer{
node: n,
raftStorage: s,
store: st,
transport: &fakeTransporter{ss},
storage: &storageRecorder{},
Ticker: tk.C,
Cluster: cl,
reqIDGen: idutil.NewGenerator(uint8(i), time.Time{}),
}
ss[i] = srv
}
// Start the servers after they're all created to avoid races in send().
for i := uint64(0); i < ns; i++ {
ss[i].start()
}
for i := 1; i <= 10; i++ {
r := pb.Request{
Method: "PUT",
Path: "/foo",
Val: "bar",
}
j := rand.Intn(len(ss))
t.Logf("ss = %d", j)
resp, err := ss[j].Do(ctx, r)
if err != nil {
t.Fatal(err)
}
g, w := resp.Event.Node, &store.NodeExtern{
Key: "/foo",
ModifiedIndex: uint64(i) + ns,
CreatedIndex: uint64(i) + ns,
Value: stringp("bar"),
}
if !reflect.DeepEqual(g, w) {
t.Error("value:", *g.Value)
t.Errorf("g = %+v, w %+v", g, w)
}
}
time.Sleep(10 * time.Millisecond)
var last interface{}
for i, sv := range ss {
sv.Stop()
g, _ := sv.store.Get("/", true, true)
if last != nil && !reflect.DeepEqual(last, g) {
t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
}
last = g
}
}
func TestDoProposal(t *testing.T) {
tests := []pb.Request{
pb.Request{Method: "POST", ID: 1},
@ -613,29 +510,18 @@ func TestDoProposal(t *testing.T) {
pb.Request{Method: "DELETE", ID: 1},
pb.Request{Method: "GET", ID: 1, Quorum: true},
}
for i, tt := range tests {
ctx, _ := context.WithCancel(context.Background())
s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
st := &storeRecorder{}
tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock
close(tk)
cl := newCluster("abc")
cl.SetStore(store.New())
srv := &EtcdServer{
node: n,
raftStorage: s,
node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
store: st,
transport: &nopTransporter{},
storage: &storageRecorder{},
Ticker: tk,
Cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
resp, err := srv.Do(ctx, tt)
resp, err := srv.Do(context.Background(), tt)
srv.Stop()
action := st.Action()
@ -653,34 +539,16 @@ func TestDoProposal(t *testing.T) {
}
func TestDoProposalCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// node cannot make any progress because there are two nodes
s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
st := &storeRecorder{}
wait := &waitRecorder{}
srv := &EtcdServer{
// TODO: use fake node for better testability
node: n,
raftStorage: s,
store: st,
w: wait,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
node: &nodeRecorder{},
w: wait,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
done := make(chan struct{})
var err error
go func() {
_, err = srv.Do(ctx, pb.Request{Method: "PUT"})
close(done)
}()
ctx, cancel := context.WithCancel(context.Background())
cancel()
<-done
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
gaction := st.Action()
if len(gaction) != 0 {
t.Errorf("len(action) = %v, want 0", len(gaction))
}
if err != ErrCanceled {
t.Fatalf("err = %v, want %v", err, ErrCanceled)
}
@ -691,88 +559,57 @@ func TestDoProposalCancelled(t *testing.T) {
}
func TestDoProposalTimeout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 0)
srv := &EtcdServer{
node: &nodeRecorder{},
w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
_, err := srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
ctx, _ := context.WithTimeout(context.Background(), 0)
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
if err != ErrTimeout {
t.Fatalf("err = %v, want %v", err, ErrTimeout)
}
}
func TestDoProposalStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// node cannot make any progress because there are two nodes
s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
st := &storeRecorder{}
tk := make(chan time.Time)
// this makes <-tk always successful, which accelarates internal clock
close(tk)
cl := newCluster("abc")
cl.SetStore(store.New())
srv := &EtcdServer{
// TODO: use fake node for better testability
node: n,
raftStorage: s,
store: st,
transport: &nopTransporter{},
storage: &storageRecorder{},
Ticker: tk,
Cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
done := make(chan struct{})
var err error
go func() {
_, err = srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
close(done)
}()
srv.Stop()
<-done
action := st.Action()
if len(action) != 0 {
t.Errorf("len(action) = %v, want 0", len(action))
node: &nodeRecorder{},
w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.done = make(chan struct{})
close(srv.done)
_, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
if err != ErrStopped {
t.Errorf("err = %v, want %v", err, ErrStopped)
}
}
// TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
func TestSync(t *testing.T) {
n := &nodeProposeDataRecorder{}
n := &nodeRecorder{}
srv := &EtcdServer{
node: n,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
done := make(chan struct{})
go func() {
srv.sync(10 * time.Second)
close(done)
}()
// check that sync is non-blocking
select {
case <-done:
case <-time.After(time.Second):
timer := time.AfterFunc(time.Second, func() {
t.Fatalf("sync should be non-blocking but did not return after 1s!")
}
})
srv.sync(10 * time.Second)
timer.Stop()
testutil.ForceGosched()
data := n.data()
if len(data) != 1 {
t.Fatalf("len(proposeData) = %d, want 1", len(data))
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].name)
}
data := action[0].params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data[0]); err != nil {
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "SYNC" {
@ -788,21 +625,14 @@ func TestSyncTimeout(t *testing.T) {
node: n,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
done := make(chan struct{})
go func() {
srv.sync(0)
close(done)
}()
// check that sync is non-blocking
select {
case <-done:
case <-time.After(time.Second):
timer := time.AfterFunc(time.Second, func() {
t.Fatalf("sync should be non-blocking but did not return after 1s!")
}
})
srv.sync(0)
timer.Stop()
// give time for goroutine in sync to cancel
// TODO: use fake clock
testutil.ForceGosched()
w := []action{action{name: "Propose blocked"}}
if g := n.Action(); !reflect.DeepEqual(g, w) {
@ -812,24 +642,9 @@ func TestSyncTimeout(t *testing.T) {
// TODO: TestNoSyncWhenNoLeader
// blockingNodeProposer implements the node interface to allow users to
// block until Propose has been called and then verify the Proposed data
type blockingNodeProposer struct {
ch chan []byte
readyNode
}
func (n *blockingNodeProposer) Propose(_ context.Context, data []byte) error {
n.ch <- data
return nil
}
// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
func TestSyncTrigger(t *testing.T) {
n := &blockingNodeProposer{
ch: make(chan []byte),
readyNode: *newReadyNode(),
}
n := newReadyNode()
st := make(chan time.Time, 1)
srv := &EtcdServer{
node: n,
@ -841,6 +656,7 @@ func TestSyncTrigger(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
defer srv.Stop()
// trigger the server to become a leader and accept sync requests
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
@ -849,13 +665,16 @@ func TestSyncTrigger(t *testing.T) {
}
// trigger a sync request
st <- time.Time{}
var data []byte
select {
case <-time.After(time.Second):
t.Fatalf("did not receive proposed request as expected!")
case data = <-n.ch:
testutil.ForceGosched()
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
srv.Stop()
if action[0].name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].name)
}
data := action[0].params[0].([]byte)
var req pb.Request
if err := req.Unmarshal(data); err != nil {
t.Fatalf("error unmarshalling data: %v", err)
@ -866,29 +685,17 @@ func TestSyncTrigger(t *testing.T) {
}
// snapshot should snapshot the store and cut the persistent
// TODO: node.Compact is called... we need to make the node an interface
func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
defer n.Stop()
// Now we can have an election and persist the rest of the log.
// This causes HardState.Commit to advance. HardState.Commit must
// be > 0 to snapshot.
n.Campaign(context.Background())
rd := <-n.Ready()
s.Append(rd.Entries)
n.Advance()
s.Append([]raftpb.Entry{{Index: 1}})
st := &storeRecorder{}
p := &storageRecorder{}
srv := &EtcdServer{
node: &nodeRecorder{},
raftStorage: s,
store: st,
storage: p,
node: n,
raftStorage: s,
}
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
gaction := st.Action()
if len(gaction) != 1 {
@ -897,7 +704,6 @@ func TestSnapshot(t *testing.T) {
if !reflect.DeepEqual(gaction[0], action{name: "Save"}) {
t.Errorf("action = %s, want Save", gaction[0])
}
gaction = p.Action()
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 2", len(gaction))
@ -912,39 +718,28 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
ctx := context.Background()
s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
n.Campaign(ctx)
snapc := 10
st := &storeRecorder{}
p := &storageRecorder{}
cl := newCluster("abc")
cl.SetStore(store.New())
srv := &EtcdServer{
node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
store: st,
transport: &nopTransporter{},
storage: p,
node: n,
raftStorage: s,
snapCount: 10,
Cluster: cl,
snapCount: uint64(snapc),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
// wait for saving nop
time.Sleep(time.Millisecond)
for i := 0; uint64(i) < srv.snapCount-1; i++ {
srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
for i := 0; i < snapc+1; i++ {
srv.Do(context.Background(), pb.Request{Method: "PUT"})
}
// wait for saving the last entry
time.Sleep(time.Millisecond)
srv.Stop()
gaction := p.Action()
// each operation is recorded as a Save
// BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
wcnt := 2 + int(srv.snapCount)
// (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + Cut + SaveSnap
wcnt := 3 + snapc
if len(gaction) != wcnt {
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
@ -1168,10 +963,8 @@ func TestUpdateMember(t *testing.T) {
// TODO: test server could stop itself when being removed
// TODO: test wait trigger correctness in multi-server case
func TestPublish(t *testing.T) {
n := &nodeProposeDataRecorder{}
n := &nodeRecorder{}
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
@ -1186,12 +979,16 @@ func TestPublish(t *testing.T) {
}
srv.publish(time.Hour)
data := n.data()
if len(data) != 1 {
t.Fatalf("len(proposeData) = %d, want 1", len(data))
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].name)
}
data := action[0].params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data[0]); err != nil {
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
@ -1227,6 +1024,8 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
n := &nodeRecorder{}
srv := &EtcdServer{
node: n,
@ -1330,6 +1129,10 @@ func TestGetBool(t *testing.T) {
}
}
func boolp(b bool) *bool { return &b }
func stringp(s string) *string { return &s }
type action struct {
name string
params []interface{}
@ -1461,9 +1264,14 @@ func (w *waitRecorder) Trigger(id uint64, x interface{}) {
w.action = append(w.action, action{name: "Trigger"})
}
func boolp(b bool) *bool { return &b }
type waitWithResponse struct {
ch <-chan interface{}
}
func stringp(s string) *string { return &s }
func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
return w.ch
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
type storageRecorder struct {
recorder
@ -1485,39 +1293,17 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
}
func (p *storageRecorder) Close() error { return nil }
type readyNode struct {
readyc chan raft.Ready
}
func newReadyNode() *readyNode {
readyc := make(chan raft.Ready, 1)
return &readyNode{readyc: readyc}
}
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
return nil
}
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) Advance() {}
func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { return nil }
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {}
type nodeRecorder struct {
recorder
}
func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) }
func (n *nodeRecorder) Campaign(ctx context.Context) error {
n.record(action{name: "Campaign"})
return nil
}
func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.record(action{name: "Propose"})
n.record(action{name: "Propose", params: []interface{}{data}})
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
@ -1541,26 +1327,6 @@ func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
n.record(action{name: "Compact"})
}
type nodeProposeDataRecorder struct {
nodeRecorder
sync.Mutex
d [][]byte
}
func (n *nodeProposeDataRecorder) data() [][]byte {
n.Lock()
d := n.d
n.Unlock()
return d
}
func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error {
n.nodeRecorder.Propose(ctx, data)
n.Lock()
n.d = append(n.d, data)
n.Unlock()
return nil
}
type nodeProposalBlockerRecorder struct {
nodeRecorder
}
@ -1599,14 +1365,40 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange
return &raftpb.ConfState{}
}
type waitWithResponse struct {
ch <-chan interface{}
// nodeCommitter commits proposed data immediately.
type nodeCommitter struct {
nodeRecorder
readyc chan raft.Ready
index uint64
}
func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
return w.ch
func newNodeCommitter() *nodeCommitter {
readyc := make(chan raft.Ready, 1)
return &nodeCommitter{readyc: readyc}
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
n.index++
ents := []raftpb.Entry{{Index: n.index, Data: data}}
n.readyc <- raft.Ready{
Entries: ents,
CommittedEntries: ents,
}
return nil
}
func (n *nodeCommitter) Ready() <-chan raft.Ready {
return n.readyc
}
type readyNode struct {
nodeRecorder
readyc chan raft.Ready
}
func newReadyNode() *readyNode {
readyc := make(chan raft.Ready, 1)
return &readyNode{readyc: readyc}
}
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
type nopTransporter struct{}
@ -1619,16 +1411,3 @@ func (s *nopTransporter) Stop() {}
func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil }
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
m := Member{ID: types.ID(id)}
b, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
peers[i] = raft.Peer{ID: id, Context: b}
}
return peers
}