From 29d7a2a5589bf8120ca6a17d5d74aaa0e76d1644 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Dec 2014 21:12:13 -0800 Subject: [PATCH] etcd: update conf when apply the confChange entry --- etcdserver/server.go | 19 +++++++++---------- etcdserver/server_test.go | 8 ++++---- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 94cf22ff5..b8ab8f72f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -395,7 +395,7 @@ func (s *EtcdServer) run() { // snapi indicates the index of the last submitted snapshot request snapi := snap.Metadata.Index appliedi := snap.Metadata.Index - nodes := snap.Metadata.ConfState.Nodes + confState := &snap.Metadata.ConfState defer func() { s.node.Stop() @@ -412,7 +412,6 @@ func (s *EtcdServer) run() { case rd := <-s.node.Ready(): if rd.SoftState != nil { atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead) - nodes = rd.SoftState.Nodes if rd.RaftState == raft.StateLeader { syncC = s.SyncTicker } else { @@ -459,7 +458,7 @@ func (s *EtcdServer) run() { ents = rd.CommittedEntries[appliedi+1-firsti:] } if len(ents) > 0 { - if appliedi, shouldstop = s.apply(ents); shouldstop { + if appliedi, shouldstop = s.apply(ents, confState); shouldstop { return } } @@ -469,7 +468,7 @@ func (s *EtcdServer) run() { if appliedi-snapi > s.snapCount { log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) - s.snapshot(appliedi, nodes) + s.snapshot(appliedi, confState) snapi = appliedi } case <-syncC: @@ -701,7 +700,7 @@ func getExpirationTime(r *pb.Request) time.Time { // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. -func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) { +func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) { var applied uint64 for i := range es { e := es[i] @@ -713,7 +712,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) { case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - shouldstop, err := s.applyConfChange(cc) + shouldstop, err := s.applyConfChange(cc, confState) s.w.Trigger(cc.ID, err) if shouldstop { return applied, true @@ -779,13 +778,13 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) return false, err } - s.node.ApplyConfChange(cc) + confState = s.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: m := new(Member) @@ -833,14 +832,14 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) { } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { +func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { d, err := s.store.Save() // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? if err != nil { log.Panicf("etcdserver: store save should never fail: %v", err) } - err = s.raftStorage.Compact(snapi, &raftpb.ConfState{Nodes: snapnodes}, d) + err = s.raftStorage.Compact(snapi, confState, d) if err != nil { // the snapshot was done asynchronously with the progress of raft. // raft might have already got a newer snapshot and called compact. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 14574b541..60846a0cb 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -475,7 +475,7 @@ func TestApplyConfChangeError(t *testing.T) { node: n, Cluster: cl, } - _, err := srv.applyConfChange(tt.cc) + _, err := srv.applyConfChange(tt.cc, nil) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -509,7 +509,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { NodeID: 2, } // remove non-local member - shouldStop, err := srv.applyConfChange(cc) + shouldStop, err := srv.applyConfChange(cc, nil) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -519,7 +519,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // remove local member cc.NodeID = 1 - shouldStop, err = srv.applyConfChange(cc) + shouldStop, err = srv.applyConfChange(cc, nil) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -917,7 +917,7 @@ func TestSnapshot(t *testing.T) { raftStorage: s, } - srv.snapshot(1, []uint64{1}) + srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}}) gaction := st.Action() if len(gaction) != 1 { t.Fatalf("len(action) = %d, want 1", len(gaction))