etcdserver: apply bootstrap conf change

This commit is contained in:
Yicheng Qin 2014-10-10 17:00:14 -07:00
parent 0319b033ea
commit f693c6ddf2
6 changed files with 60 additions and 66 deletions

View File

@ -29,14 +29,6 @@ type clusterStore struct {
Store store.Store Store store.Store
} }
func NewClusterStore(st store.Store, c Cluster) ClusterStore {
cls := &clusterStore{Store: st}
for _, m := range c {
cls.Add(*m)
}
return cls
}
// Add puts a new Member into the store. // Add puts a new Member into the store.
// A Member with a matching id must not exist. // A Member with a matching id must not exist.
func (s *clusterStore) Add(m Member) { func (s *clusterStore) Add(m Member) {

View File

@ -76,14 +76,14 @@ func TestClusterStoreGet(t *testing.T) {
}, },
} }
for i, tt := range tests { for i, tt := range tests {
c := Cluster{} cs := &clusterStore{Store: newGetAllStore()}
err := c.AddSlice(tt.mems) for _, m := range tt.mems {
if err != nil { cs.Add(m)
t.Error(err) }
c := Cluster{}
if err := c.AddSlice(tt.mems); err != nil {
t.Fatal(err)
} }
cs := NewClusterStore(newGetAllStore(), c)
if g := cs.Get(); !reflect.DeepEqual(g, c) { if g := cs.Get(); !reflect.DeepEqual(g, c) {
t.Errorf("#%d: mems = %v, want %v", i, g, c) t.Errorf("#%d: mems = %v, want %v", i, g, c)
} }
@ -92,9 +92,8 @@ func TestClusterStoreGet(t *testing.T) {
func TestClusterStoreDelete(t *testing.T) { func TestClusterStoreDelete(t *testing.T) {
st := newStoreGetAllAndDeleteRecorder() st := newStoreGetAllAndDeleteRecorder()
c := Cluster{} cs := &clusterStore{Store: st}
c.Add(newTestMember(1, nil, "node1", nil)) cs.Add(newTestMember(1, nil, "node1", nil))
cs := NewClusterStore(st, c)
cs.Remove(1) cs.Remove(1)
wdeletes := []string{machineKVPrefix + "1"} wdeletes := []string{machineKVPrefix + "1"}

View File

@ -125,13 +125,13 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
} }
ids := cfg.Cluster.IDs() ids := cfg.Cluster.IDs()
var peers []raft.Peer peers := make([]raft.Peer, len(ids))
for _, id := range ids { for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster)[id]) ctx, err := json.Marshal((*cfg.Cluster)[id])
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
peers = append(peers, raft.Peer{ID: id, Context: ctx}) peers[i] = raft.Peer{ID: id, Context: ctx}
} }
n = raft.StartNode(m.ID, peers, 10, 1) n = raft.StartNode(m.ID, peers, 10, 1)
} else { } else {
@ -165,11 +165,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if info.ID != m.ID { if info.ID != m.ID {
log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID) log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID)
} }
n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents) n = raft.RestartNode(m.ID, 10, 1, snapshot, st, ents)
} }
cls := NewClusterStore(st, *cfg.Cluster) cls := &clusterStore{Store: st}
s := &EtcdServer{ s := &EtcdServer{
store: st, store: st,
node: n, node: n,
@ -544,12 +543,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
s.node.ApplyConfChange(cc) s.node.ApplyConfChange(cc)
switch cc.Type { switch cc.Type {
case raftpb.ConfChangeAddNode: case raftpb.ConfChangeAddNode:
// TODO(yichengq): this is the hack and should be removed SOON.
// Bootstrap write addNode entries into log, which don't set Context
// value. They don't need to be applied because now we do it explicitly
// before server starts. This hack makes etcd work, and will be removed
// in the following PR.
break
var m Member var m Member
if err := json.Unmarshal(cc.Context, &m); err != nil { if err := json.Unmarshal(cc.Context, &m); err != nil {
panic("unexpected unmarshal error") panic("unexpected unmarshal error")

View File

@ -383,10 +383,11 @@ func testServer(t *testing.T, ns uint64) {
} }
} }
members := make([]raft.Peer, ns) ids := make([]uint64, ns)
for i := uint64(0); i < ns; i++ { for i := uint64(0); i < ns; i++ {
members[i] = raft.Peer{ID: i + 1} ids[i] = i + 1
} }
members := mustMakePeerSlice(t, ids...)
for i := uint64(0); i < ns; i++ { for i := uint64(0); i < ns; i++ {
id := i + 1 id := i + 1
@ -399,11 +400,9 @@ func testServer(t *testing.T, ns uint64) {
send: send, send: send,
storage: &storageRecorder{}, storage: &storageRecorder{},
ticker: tk.C, ticker: tk.C,
ClusterStore: &clusterStoreRecorder{},
} }
srv.start() srv.start()
// TODO(xiangli): randomize election timeout
// then remove this sleep.
time.Sleep(1 * time.Millisecond)
ss[i] = srv ss[i] = srv
} }
@ -457,7 +456,7 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
ctx, _ := context.WithCancel(context.Background()) ctx, _ := context.WithCancel(context.Background())
n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock // this makes <-tk always successful, which accelerates internal clock
@ -468,6 +467,7 @@ func TestDoProposal(t *testing.T) {
send: func(_ []raftpb.Message) {}, send: func(_ []raftpb.Message) {},
storage: &storageRecorder{}, storage: &storageRecorder{},
ticker: tk, ticker: tk,
ClusterStore: &clusterStoreRecorder{},
} }
srv.start() srv.start()
resp, err := srv.Do(ctx, tt) resp, err := srv.Do(ctx, tt)
@ -490,7 +490,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) { func TestDoProposalCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
wait := &waitRecorder{} wait := &waitRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
@ -526,7 +526,7 @@ func TestDoProposalStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}, {ID: 0xBAD1}}, 10, 1) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelarates internal clock // this makes <-tk always successful, which accelarates internal clock
@ -667,7 +667,7 @@ func TestSyncTrigger(t *testing.T) {
// snapshot should snapshot the store and cut the persistent // snapshot should snapshot the store and cut the persistent
// TODO: node.Compact is called... we need to make the node an interface // TODO: node.Compact is called... we need to make the node an interface
func TestSnapshot(t *testing.T) { func TestSnapshot(t *testing.T) {
n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
defer n.Stop() defer n.Stop()
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
@ -698,7 +698,9 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event // Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) { func TestTriggerSnap(t *testing.T) {
ctx := context.Background() ctx := context.Background()
n := raft.StartNode(0xBAD0, []raft.Peer{{ID: 0xBAD0}}, 10, 1) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
<-n.Ready()
n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0})
n.Campaign(ctx) n.Campaign(ctx)
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
@ -708,6 +710,7 @@ func TestTriggerSnap(t *testing.T) {
storage: p, storage: p,
node: n, node: n,
snapCount: 10, snapCount: 10,
ClusterStore: &clusterStoreRecorder{},
} }
s.start() s.start()
@ -787,9 +790,6 @@ func TestRecvSlowSnapshot(t *testing.T) {
// TestAddMember tests AddMember can propose and perform node addition. // TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) { func TestAddMember(t *testing.T) {
// This one is broken until hack at ApplyConfChange is removed
t.Skip("")
n := newNodeConfChangeCommitterRecorder() n := newNodeConfChangeCommitterRecorder()
cs := &clusterStoreRecorder{} cs := &clusterStoreRecorder{}
s := &EtcdServer{ s := &EtcdServer{
@ -1252,3 +1252,16 @@ func (cs *clusterStoreRecorder) Get() Cluster {
func (cs *clusterStoreRecorder) Remove(id uint64) { func (cs *clusterStoreRecorder) Remove(id uint64) {
cs.record(action{name: "Remove", params: []interface{}{id}}) cs.record(action{name: "Remove", params: []interface{}{id}})
} }
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
m := Member{ID: id}
b, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
peers[i] = raft.Peer{ID: id, Context: b}
}
return peers
}

View File

@ -127,11 +127,7 @@ type Peer struct {
// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. // It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
n := newNode() n := newNode()
peerIDs := make([]uint64, len(peers)) r := newRaft(id, nil, election, heartbeat)
for i, peer := range peers {
peerIDs[i] = peer.ID
}
r := newRaft(id, peerIDs, election, heartbeat)
ents := make([]pb.Entry, len(peers)) ents := make([]pb.Entry, len(peers))
for i, peer := range peers { for i, peer := range peers {
@ -152,9 +148,9 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
// RestartNode is identical to StartNode but takes an initial State and a slice // RestartNode is identical to StartNode but takes an initial State and a slice
// of entries. Generally this is used when restarting from a stable storage // of entries. Generally this is used when restarting from a stable storage
// log. // log.
func RestartNode(id uint64, peers []uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node { func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
n := newNode() n := newNode()
r := newRaft(id, peers, election, heartbeat) r := newRaft(id, nil, election, heartbeat)
if snapshot != nil { if snapshot != nil {
r.restore(*snapshot) r.restore(*snapshot)
} }

View File

@ -176,6 +176,7 @@ func TestNode(t *testing.T) {
} }
n := StartNode(1, []Peer{{ID: 1}}, 10, 1) n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
n.ApplyConfChange(cc)
n.Campaign(ctx) n.Campaign(ctx)
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
@ -207,7 +208,7 @@ func TestNodeRestart(t *testing.T) {
CommittedEntries: entries[1 : st.Commit+1], CommittedEntries: entries[1 : st.Commit+1],
} }
n := RestartNode(1, []uint64{1}, 10, 1, nil, st, entries) n := RestartNode(1, 10, 1, nil, st, entries)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} }