diff --git a/etcdserver/server.go b/etcdserver/server.go index 552e0caff..401b0c011 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path" + "sort" "sync/atomic" "time" @@ -129,7 +130,14 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.Create(waldir); err != nil { log.Fatal(err) } - n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) + ids := cfg.Cluster.IDs() + sort.Sort(int64Slice(ids)) + ccs := make([]raftpb.ConfChange, len(ids)) + for i, id := range ids { + // TODO: add context for PeerURLs + ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id} + } + n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs) } else { var index int64 snapshot, err := ss.Load() @@ -552,3 +560,10 @@ func getBool(v *bool) (vv bool, set bool) { } return *v, true } + +// int64Slice implements sort interface +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5ff006332..165f43fe9 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) { for i := int64(0); i < ns; i++ { id := i + 1 - n := raft.StartNode(id, members, 10, 1) + n := raft.StartNode(id, members, 10, 1, nil) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ @@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock @@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} diff --git a/raft/example_test.go b/raft/example_test.go index 289b970fc..c957068b4 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0) + n := StartNode(0, nil, 0, 0, nil) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index e853758ff..f755e4bd5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -101,9 +101,21 @@ type Node interface { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -func StartNode(id int64, peers []int64, election, heartbeat int) Node { +// It also wraps ConfChanges in entry and puts them at the head of the log. +func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) + ents := make([]pb.Entry, len(ccs)) + for i, cc := range ccs { + data, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data} + } + if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) { + panic("unexpected append failure") + } go n.run(r) return &n } diff --git a/raft/node_test.go b/raft/node_test.go index 4843f1ccb..9e93a7ef6 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -149,21 +149,33 @@ func TestNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + ccdata, err := cc.Marshal() + if err != nil { + t.Fatalf("unexpected marshal error: %v", err) + } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: raftpb.HardState{Term: 1, Commit: 1}, - Entries: []raftpb.Entry{{}, {Term: 1, Index: 1}}, - CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: raftpb.HardState{Term: 1, Commit: 2}, + Entries: []raftpb.Entry{ + {}, + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 1, Index: 2}, + }, + CommittedEntries: []raftpb.Entry{ + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 1, Index: 2}, + }, }, { - HardState: raftpb.HardState{Term: 1, Commit: 2}, - Entries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, + HardState: raftpb.HardState{Term: 1, Commit: 3}, + Entries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}}, }, } - n := StartNode(1, []int64{1}, 0, 0) + n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc}) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])