From 45ebfb4217bdec5934004f55199adff81a2153ce Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 6 Oct 2014 15:22:35 -0700 Subject: [PATCH] raft: refine initial entries logic in StartNode --- etcdserver/cluster.go | 1 + etcdserver/cluster_test.go | 8 ++------ etcdserver/server.go | 11 ++--------- etcdserver/server_test.go | 24 ++++++++++++------------ raft/example_test.go | 2 +- raft/node.go | 16 +++++++++------- raft/node_test.go | 2 +- 7 files changed, 28 insertions(+), 36 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index eaace6217..c43d437e0 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -100,6 +100,7 @@ func (c Cluster) IDs() []int64 { for _, m := range c { ids = append(ids, m.ID) } + sort.Sort(types.Int64Slice(ids)) return ids } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index ccdb6e2c0..89529c72c 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -2,10 +2,7 @@ package etcdserver import ( "reflect" - "sort" "testing" - - "github.com/coreos/etcd/pkg/types" ) func TestClusterAddSlice(t *testing.T) { @@ -210,9 +207,8 @@ func TestClusterIDs(t *testing.T) { {ID: 4}, {ID: 100}, }) - w := types.Int64Slice([]int64{1, 4, 100}) - g := types.Int64Slice(cs.IDs()) - sort.Sort(g) + w := []int64{1, 4, 100} + g := cs.IDs() if !reflect.DeepEqual(w, g) { t.Errorf("IDs=%+v, want %+v", g, w) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 4354d68a7..8f8a6f3b9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "path" - "sort" "sync/atomic" "time" @@ -130,14 +129,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.Create(waldir); err != nil { log.Fatal(err) } - ids := cfg.Cluster.IDs() - sort.Sort(types.Int64Slice(ids)) - ccs := make([]raftpb.ConfChange, len(ids)) - for i, id := range ids { - // TODO: add context for PeerURLs - ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id} - } - n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs) + // TODO: add context for PeerURLs + n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) } else { var index int64 snapshot, err := ss.Load() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 165f43fe9..22dae63fe 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) { for i := int64(0); i < ns; i++ { id := i + 1 - n := raft.StartNode(id, members, 10, 1, nil) + n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ @@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock @@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} @@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) { } s.start() - for i := 0; int64(i) < s.snapCount; i++ { + for i := 0; int64(i) < s.snapCount-1; i++ { s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) } time.Sleep(time.Millisecond) @@ -720,12 +720,12 @@ func TestTriggerSnap(t *testing.T) { gaction := p.Action() // each operation is recorded as a Save - // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap - if len(gaction) != 3+int(s.snapCount) { - t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.snapCount)) + // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap + if len(gaction) != 2+int(s.snapCount) { + t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount)) } - if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[12]) + if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[11]) } } diff --git a/raft/example_test.go b/raft/example_test.go index c957068b4..289b970fc 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0, nil) + n := StartNode(0, nil, 0, 0) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index f755e4bd5..28c3e797c 100644 --- a/raft/node.go +++ b/raft/node.go @@ -101,21 +101,23 @@ type Node interface { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -// It also wraps ConfChanges in entry and puts them at the head of the log. -func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node { +// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. +func StartNode(id int64, peers []int64, election, heartbeat int) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) - ents := make([]pb.Entry, len(ccs)) - for i, cc := range ccs { + + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer} data, err := cc.Marshal() if err != nil { panic("unexpected marshal error") } ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data} } - if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) { - panic("unexpected append failure") - } + r.raftLog.append(0, ents...) + r.raftLog.committed = int64(len(ents)) + go n.run(r) return &n } diff --git a/raft/node_test.go b/raft/node_test.go index 9e93a7ef6..0822c1e91 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -175,7 +175,7 @@ func TestNode(t *testing.T) { }, } - n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc}) + n := StartNode(1, []int64{1}, 0, 0) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])