diff --git a/etcdserver/server.go b/etcdserver/server.go index 187d82e13..1eb5bcad2 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -599,6 +599,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { type etcdProgress struct { confState raftpb.ConfState snapi uint64 + appliedt uint64 appliedi uint64 } @@ -676,6 +677,7 @@ func (s *EtcdServer) run() { ep := etcdProgress{ confState: sn.Metadata.ConfState, snapi: sn.Metadata.Index, + appliedt: sn.Metadata.Term, appliedi: sn.Metadata.Index, } @@ -777,7 +779,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { select { // snapshot requested via send() case m := <-s.r.msgSnapC: - merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) + merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState) s.sendMergedSnap(merged) default: } @@ -879,6 +881,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } plog.Info("finished adding peers from new cluster configuration into network...") + ep.appliedt = apply.snapshot.Metadata.Term ep.appliedi = apply.snapshot.Metadata.Index ep.snapi = ep.appliedi ep.confState = apply.snapshot.Metadata.ConfState @@ -900,7 +903,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { return } var shouldstop bool - if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1254,9 +1257,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. -func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) { - var applied uint64 - var shouldstop bool +func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) { for i := range es { e := es[i] switch e.Type { @@ -1266,16 +1267,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState) - shouldstop = shouldstop || removedSelf + shouldStop = shouldStop || removedSelf s.w.Trigger(cc.ID, err) default: plog.Panicf("entry type should be either EntryNormal or EntryConfChange") } atomic.StoreUint64(&s.r.index, e.Index) atomic.StoreUint64(&s.r.term, e.Term) - applied = e.Index + appliedt = e.Term + appliedi = e.Index } - return applied, shouldstop + return appliedt, appliedi, shouldStop } // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index af79517e0..57b1aebd8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -615,7 +615,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) } diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 1de996c50..9cfc85216 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -16,7 +16,6 @@ package etcdserver import ( "io" - "log" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/raft/raftpb" @@ -26,12 +25,7 @@ import ( // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf), // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message // as ReadCloser. -func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message { - snapt, err := s.r.raftStorage.Term(snapi) - if err != nil { - log.Panicf("get term should never fail: %v", err) - } - +func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { // get a snapshot of v2 store as []byte clone := s.store.Clone() d, err := clone.SaveNoCopy()