From 13814c9d7db850024dd6790d6495db62ff9cd618 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 8 Dec 2014 16:13:16 -0800 Subject: [PATCH] etcdserver: init state before run loop correctly --- etcdserver/server.go | 29 +++++++++++++++-------------- etcdserver/server_test.go | 4 +++- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 46736c7e1..83dd9619c 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -382,12 +382,19 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { func (s *EtcdServer) run() { var syncC <-chan time.Time - // snapi indicates the index of the last submitted snapshot request - var snapi, appliedi uint64 - var nodes []uint64 var shouldstop bool shouldstopC := s.sendhub.ShouldStopNotify() + // load initial state from raft storage + snap, err := s.raftStorage.Snapshot() + if err != nil { + log.Panicf("etcdserver: get snapshot from raft storage error: %v", err) + } + // snapi indicates the index of the last submitted snapshot request + snapi := snap.Metadata.Index + appliedi := snap.Metadata.Index + nodes := snap.Metadata.ConfState.Nodes + defer func() { s.node.Stop() s.sendhub.Stop() @@ -430,24 +437,18 @@ func (s *EtcdServer) run() { // recover from snapshot if it is more updated than current applied if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi { - { - if err := s.store.Recovery(rd.Snapshot.Data); err != nil { - log.Panicf("recovery store error: %v", err) - } - s.Cluster.Recover() - appliedi = rd.Snapshot.Metadata.Index - log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi) + if err := s.store.Recovery(rd.Snapshot.Data); err != nil { + log.Panicf("recovery store error: %v", err) } + s.Cluster.Recover() + appliedi = rd.Snapshot.Metadata.Index + log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi) } // TODO(bmizerany): do this in the background, but take // care to apply entries in a single goroutine, and not // race them. if len(rd.CommittedEntries) != 0 { firsti := rd.CommittedEntries[0].Index - if appliedi == 0 { - appliedi = firsti - 1 - snapi = appliedi - } if firsti > appliedi+1 { log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d6a89cdb4..14574b541 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1621,6 +1621,7 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) type nodeConfChangeCommitterRecorder struct { nodeRecorder readyc chan raft.Ready + index uint64 } func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder { @@ -1632,7 +1633,8 @@ func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, if err != nil { return err } - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}} + n.index++ + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}} n.record(action{name: "ProposeConfChange:" + conf.Type.String()}) return nil }