etcdserver: init state before run loop correctly

This commit is contained in:
Yicheng Qin 2014-12-08 16:13:16 -08:00
parent 7e06d85651
commit 13814c9d7d
2 changed files with 18 additions and 15 deletions

View File

@ -382,12 +382,19 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
func (s *EtcdServer) run() {
var syncC <-chan time.Time
// snapi indicates the index of the last submitted snapshot request
var snapi, appliedi uint64
var nodes []uint64
var shouldstop bool
shouldstopC := s.sendhub.ShouldStopNotify()
// load initial state from raft storage
snap, err := s.raftStorage.Snapshot()
if err != nil {
log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
}
// snapi indicates the index of the last submitted snapshot request
snapi := snap.Metadata.Index
appliedi := snap.Metadata.Index
nodes := snap.Metadata.ConfState.Nodes
defer func() {
s.node.Stop()
s.sendhub.Stop()
@ -430,24 +437,18 @@ func (s *EtcdServer) run() {
// recover from snapshot if it is more updated than current applied
if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
{
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
appliedi = rd.Snapshot.Metadata.Index
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
appliedi = rd.Snapshot.Metadata.Index
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
}
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
if len(rd.CommittedEntries) != 0 {
firsti := rd.CommittedEntries[0].Index
if appliedi == 0 {
appliedi = firsti - 1
snapi = appliedi
}
if firsti > appliedi+1 {
log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
}

View File

@ -1621,6 +1621,7 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
type nodeConfChangeCommitterRecorder struct {
nodeRecorder
readyc chan raft.Ready
index uint64
}
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
@ -1632,7 +1633,8 @@ func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context,
if err != nil {
return err
}
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
n.index++
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
n.record(action{name: "ProposeConfChange:" + conf.Type.String()})
return nil
}