etcd: update conf when apply the confChange entry

This commit is contained in:
Xiang Li 2014-12-08 21:12:13 -08:00
parent 22dd3b039c
commit 29d7a2a558
2 changed files with 13 additions and 14 deletions

View File

@ -395,7 +395,7 @@ func (s *EtcdServer) run() {
// snapi indicates the index of the last submitted snapshot request
snapi := snap.Metadata.Index
appliedi := snap.Metadata.Index
nodes := snap.Metadata.ConfState.Nodes
confState := &snap.Metadata.ConfState
defer func() {
s.node.Stop()
@ -412,7 +412,6 @@ func (s *EtcdServer) run() {
case rd := <-s.node.Ready():
if rd.SoftState != nil {
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
nodes = rd.SoftState.Nodes
if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker
} else {
@ -459,7 +458,7 @@ func (s *EtcdServer) run() {
ents = rd.CommittedEntries[appliedi+1-firsti:]
}
if len(ents) > 0 {
if appliedi, shouldstop = s.apply(ents); shouldstop {
if appliedi, shouldstop = s.apply(ents, confState); shouldstop {
return
}
}
@ -469,7 +468,7 @@ func (s *EtcdServer) run() {
if appliedi-snapi > s.snapCount {
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, nodes)
s.snapshot(appliedi, confState)
snapi = appliedi
}
case <-syncC:
@ -701,7 +700,7 @@ func getExpirationTime(r *pb.Request) time.Time {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
for i := range es {
e := es[i]
@ -713,7 +712,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
shouldstop, err := s.applyConfChange(cc)
shouldstop, err := s.applyConfChange(cc, confState)
s.w.Trigger(cc.ID, err)
if shouldstop {
return applied, true
@ -779,13 +778,13 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.node.ApplyConfChange(cc)
return false, err
}
s.node.ApplyConfChange(cc)
confState = s.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(Member)
@ -833,14 +832,14 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
}
// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
d, err := s.store.Save()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
log.Panicf("etcdserver: store save should never fail: %v", err)
}
err = s.raftStorage.Compact(snapi, &raftpb.ConfState{Nodes: snapnodes}, d)
err = s.raftStorage.Compact(snapi, confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot and called compact.

View File

@ -475,7 +475,7 @@ func TestApplyConfChangeError(t *testing.T) {
node: n,
Cluster: cl,
}
_, err := srv.applyConfChange(tt.cc)
_, err := srv.applyConfChange(tt.cc, nil)
if err != tt.werr {
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
}
@ -509,7 +509,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
NodeID: 2,
}
// remove non-local member
shouldStop, err := srv.applyConfChange(cc)
shouldStop, err := srv.applyConfChange(cc, nil)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -519,7 +519,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// remove local member
cc.NodeID = 1
shouldStop, err = srv.applyConfChange(cc)
shouldStop, err = srv.applyConfChange(cc, nil)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -917,7 +917,7 @@ func TestSnapshot(t *testing.T) {
raftStorage: s,
}
srv.snapshot(1, []uint64{1})
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
gaction := st.Action()
if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction))