From 0319b033ea4d1dfd5a9153a8f89b1fb8f3ac6064 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 7 Oct 2014 17:38:58 -0700 Subject: [PATCH 1/2] etcdserver/raft: set context for bootstrap addnode entries --- etcdserver/server.go | 17 ++++++++++++----- etcdserver/server_test.go | 17 ++++++++++------- raft/node.go | 15 ++++++++++++--- raft/node_test.go | 2 +- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 4c42d456f..7b5c86d0d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -123,8 +123,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.Create(waldir, b); err != nil { log.Fatal(err) } - // TODO: add context for PeerURLs - n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) + + ids := cfg.Cluster.IDs() + var peers []raft.Peer + for _, id := range ids { + ctx, err := json.Marshal((*cfg.Cluster)[id]) + if err != nil { + log.Fatal(err) + } + peers = append(peers, raft.Peer{ID: id, Context: ctx}) + } + n = raft.StartNode(m.ID, peers, 10, 1) } else { if cfg.DiscoveryURL != "" { log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir) @@ -540,9 +549,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) { // value. They don't need to be applied because now we do it explicitly // before server starts. This hack makes etcd work, and will be removed // in the following PR. - if cc.Context == nil { - break - } + break var m Member if err := json.Unmarshal(cc.Context, &m); err != nil { panic("unexpected unmarshal error") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 0b730e093..64ddf43e4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -383,9 +383,9 @@ func testServer(t *testing.T, ns uint64) { } } - members := make([]uint64, ns) + members := make([]raft.Peer, ns) for i := uint64(0); i < ns; i++ { - members[i] = i + 1 + members[i] = raft.Peer{ID: i + 1} } for i := uint64(0); i < ns; i++ { @@ -457,7 +457,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock @@ -490,7 +490,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -526,7 +526,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -667,7 +667,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -698,7 +698,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} @@ -787,6 +787,9 @@ func TestRecvSlowSnapshot(t *testing.T) { // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { + // This one is broken until hack at ApplyConfChange is removed + t.Skip("") + n := newNodeConfChangeCommitterRecorder() cs := &clusterStoreRecorder{} s := &EtcdServer{ diff --git a/raft/node.go b/raft/node.go index ccfd287f1..94e3b1cab 100644 --- a/raft/node.go +++ b/raft/node.go @@ -117,16 +117,25 @@ type Node interface { Compact(index uint64, nodes []uint64, d []byte) } +type Peer struct { + ID uint64 + Context []byte +} + // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. // It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. -func StartNode(id uint64, peers []uint64, election, heartbeat int) Node { +func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { n := newNode() - r := newRaft(id, peers, election, heartbeat) + peerIDs := make([]uint64, len(peers)) + for i, peer := range peers { + peerIDs[i] = peer.ID + } + r := newRaft(id, peerIDs, election, heartbeat) ents := make([]pb.Entry, len(peers)) for i, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer} + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} data, err := cc.Marshal() if err != nil { panic("unexpected marshal error") diff --git a/raft/node_test.go b/raft/node_test.go index ba608076e..72a475d41 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -175,7 +175,7 @@ func TestNode(t *testing.T) { }, } - n := StartNode(1, []uint64{1}, 10, 1) + n := StartNode(1, []Peer{{ID: 1}}, 10, 1) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) From f693c6ddf21539c16cdbf30d5de7052c9bbc39da Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 10 Oct 2014 17:00:14 -0700 Subject: [PATCH 2/2] etcdserver: apply bootstrap conf change --- etcdserver/cluster_store.go | 8 ---- etcdserver/cluster_store_test.go | 19 +++++---- etcdserver/server.go | 17 +++----- etcdserver/server_test.go | 69 +++++++++++++++++++------------- raft/node.go | 10 ++--- raft/node_test.go | 3 +- 6 files changed, 60 insertions(+), 66 deletions(-) diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 14a17ae60..cf58fb977 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -29,14 +29,6 @@ type clusterStore struct { Store store.Store } -func NewClusterStore(st store.Store, c Cluster) ClusterStore { - cls := &clusterStore{Store: st} - for _, m := range c { - cls.Add(*m) - } - return cls -} - // Add puts a new Member into the store. // A Member with a matching id must not exist. func (s *clusterStore) Add(m Member) { diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go index 3357d38df..fba4b8ef9 100644 --- a/etcdserver/cluster_store_test.go +++ b/etcdserver/cluster_store_test.go @@ -76,14 +76,14 @@ func TestClusterStoreGet(t *testing.T) { }, } for i, tt := range tests { - c := Cluster{} - err := c.AddSlice(tt.mems) - if err != nil { - t.Error(err) + cs := &clusterStore{Store: newGetAllStore()} + for _, m := range tt.mems { + cs.Add(m) + } + c := Cluster{} + if err := c.AddSlice(tt.mems); err != nil { + t.Fatal(err) } - - cs := NewClusterStore(newGetAllStore(), c) - if g := cs.Get(); !reflect.DeepEqual(g, c) { t.Errorf("#%d: mems = %v, want %v", i, g, c) } @@ -92,9 +92,8 @@ func TestClusterStoreGet(t *testing.T) { func TestClusterStoreDelete(t *testing.T) { st := newStoreGetAllAndDeleteRecorder() - c := Cluster{} - c.Add(newTestMember(1, nil, "node1", nil)) - cs := NewClusterStore(st, c) + cs := &clusterStore{Store: st} + cs.Add(newTestMember(1, nil, "node1", nil)) cs.Remove(1) wdeletes := []string{machineKVPrefix + "1"} diff --git a/etcdserver/server.go b/etcdserver/server.go index 7b5c86d0d..02dad2224 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -125,13 +125,13 @@ func NewServer(cfg *ServerConfig) *EtcdServer { } ids := cfg.Cluster.IDs() - var peers []raft.Peer - for _, id := range ids { + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { ctx, err := json.Marshal((*cfg.Cluster)[id]) if err != nil { log.Fatal(err) } - peers = append(peers, raft.Peer{ID: id, Context: ctx}) + peers[i] = raft.Peer{ID: id, Context: ctx} } n = raft.StartNode(m.ID, peers, 10, 1) } else { @@ -165,11 +165,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if info.ID != m.ID { log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID) } - n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents) + n = raft.RestartNode(m.ID, 10, 1, snapshot, st, ents) } - cls := NewClusterStore(st, *cfg.Cluster) - + cls := &clusterStore{Store: st} s := &EtcdServer{ store: st, node: n, @@ -544,12 +543,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) { s.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: - // TODO(yichengq): this is the hack and should be removed SOON. - // Bootstrap write addNode entries into log, which don't set Context - // value. They don't need to be applied because now we do it explicitly - // before server starts. This hack makes etcd work, and will be removed - // in the following PR. - break var m Member if err := json.Unmarshal(cc.Context, &m); err != nil { panic("unexpected unmarshal error") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 64ddf43e4..a3602c8ba 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -383,10 +383,11 @@ func testServer(t *testing.T, ns uint64) { } } - members := make([]raft.Peer, ns) + ids := make([]uint64, ns) for i := uint64(0); i < ns; i++ { - members[i] = raft.Peer{ID: i + 1} + ids[i] = i + 1 } + members := mustMakePeerSlice(t, ids...) for i := uint64(0); i < ns; i++ { id := i + 1 @@ -394,16 +395,14 @@ func testServer(t *testing.T, ns uint64) { tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ - node: n, - store: store.New(), - send: send, - storage: &storageRecorder{}, - ticker: tk.C, + node: n, + store: store.New(), + send: send, + storage: &storageRecorder{}, + ticker: tk.C, + ClusterStore: &clusterStoreRecorder{}, } srv.start() - // TODO(xiangli): randomize election timeout - // then remove this sleep. - time.Sleep(1 * time.Millisecond) ss[i] = srv } @@ -457,17 +456,18 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) + n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock close(tk) srv := &EtcdServer{ - node: n, - store: st, - send: func(_ []raftpb.Message) {}, - storage: &storageRecorder{}, - ticker: tk, + node: n, + store: st, + send: func(_ []raftpb.Message) {}, + storage: &storageRecorder{}, + ticker: tk, + ClusterStore: &clusterStoreRecorder{}, } srv.start() resp, err := srv.Do(ctx, tt) @@ -490,7 +490,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) + n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -526,7 +526,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) + n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -667,7 +667,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) + n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -698,16 +698,19 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) + n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1) + <-n.Ready() + n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0}) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} s := &EtcdServer{ - store: st, - send: func(_ []raftpb.Message) {}, - storage: p, - node: n, - snapCount: 10, + store: st, + send: func(_ []raftpb.Message) {}, + storage: p, + node: n, + snapCount: 10, + ClusterStore: &clusterStoreRecorder{}, } s.start() @@ -787,9 +790,6 @@ func TestRecvSlowSnapshot(t *testing.T) { // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { - // This one is broken until hack at ApplyConfChange is removed - t.Skip("") - n := newNodeConfChangeCommitterRecorder() cs := &clusterStoreRecorder{} s := &EtcdServer{ @@ -1252,3 +1252,16 @@ func (cs *clusterStoreRecorder) Get() Cluster { func (cs *clusterStoreRecorder) Remove(id uint64) { cs.record(action{name: "Remove", params: []interface{}{id}}) } + +func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { + m := Member{ID: id} + b, err := json.Marshal(m) + if err != nil { + t.Fatal(err) + } + peers[i] = raft.Peer{ID: id, Context: b} + } + return peers +} diff --git a/raft/node.go b/raft/node.go index 94e3b1cab..cac78e00f 100644 --- a/raft/node.go +++ b/raft/node.go @@ -127,11 +127,7 @@ type Peer struct { // It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { n := newNode() - peerIDs := make([]uint64, len(peers)) - for i, peer := range peers { - peerIDs[i] = peer.ID - } - r := newRaft(id, peerIDs, election, heartbeat) + r := newRaft(id, nil, election, heartbeat) ents := make([]pb.Entry, len(peers)) for i, peer := range peers { @@ -152,9 +148,9 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { // RestartNode is identical to StartNode but takes an initial State and a slice // of entries. Generally this is used when restarting from a stable storage // log. -func RestartNode(id uint64, peers []uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node { +func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node { n := newNode() - r := newRaft(id, peers, election, heartbeat) + r := newRaft(id, nil, election, heartbeat) if snapshot != nil { r.restore(*snapshot) } diff --git a/raft/node_test.go b/raft/node_test.go index 72a475d41..313f41786 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -176,6 +176,7 @@ func TestNode(t *testing.T) { } n := StartNode(1, []Peer{{ID: 1}}, 10, 1) + n.ApplyConfChange(cc) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) @@ -207,7 +208,7 @@ func TestNodeRestart(t *testing.T) { CommittedEntries: entries[1 : st.Commit+1], } - n := RestartNode(1, []uint64{1}, 10, 1, nil, st, entries) + n := RestartNode(1, 10, 1, nil, st, entries) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) }