Merge pull request #3114 from yichengq/clean-raft-init

etcdserver: clean up start and stop logic of raft
This commit is contained in:
Yicheng Qin 2015-07-27 14:19:25 -07:00
commit 6fc9dbfe56
3 changed files with 70 additions and 65 deletions

View File

@ -108,67 +108,77 @@ type raftNode struct {
done chan struct{} done chan struct{}
} }
func (r *raftNode) run() { // start prepares and starts raftNode in a new goroutine. It is no longer safe
var syncC <-chan time.Time // to modify the fields after it has been started.
// TODO: Ideally raftNode should get rid of the passed in server structure.
func (r *raftNode) start(s *EtcdServer) {
r.s = s
r.applyc = make(chan apply)
r.stopped = make(chan struct{})
r.done = make(chan struct{})
defer r.stop() go func() {
for { var syncC <-chan time.Time
select {
case <-r.ticker: defer r.onStop()
r.Tick() for {
case rd := <-r.Ready(): select {
if rd.SoftState != nil { case <-r.ticker:
atomic.StoreUint64(&r.lead, rd.SoftState.Lead) r.Tick()
if rd.RaftState == raft.StateLeader { case rd := <-r.Ready():
syncC = r.s.SyncTicker if rd.SoftState != nil {
// TODO: remove the nil checking atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
// current test utility does not provide the stats if rd.RaftState == raft.StateLeader {
if r.s.stats != nil { syncC = r.s.SyncTicker
r.s.stats.BecomeLeader() // TODO: remove the nil checking
// current test utility does not provide the stats
if r.s.stats != nil {
r.s.stats.BecomeLeader()
}
} else {
syncC = nil
} }
} else {
syncC = nil
} }
}
apply := apply{ apply := apply{
entries: rd.CommittedEntries, entries: rd.CommittedEntries,
snapshot: rd.Snapshot, snapshot: rd.Snapshot,
done: make(chan struct{}), done: make(chan struct{}),
} }
select { select {
case r.applyc <- apply: case r.applyc <- apply:
case <-r.stopped:
return
}
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
r.raftStorage.Append(rd.Entries)
r.s.send(rd.Messages)
select {
case <-apply.done:
case <-r.stopped:
return
}
r.Advance()
case <-syncC:
r.s.sync(defaultSyncTimeout)
case <-r.stopped: case <-r.stopped:
return return
} }
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
r.raftStorage.Append(rd.Entries)
r.s.send(rd.Messages)
select {
case <-apply.done:
case <-r.stopped:
return
}
r.Advance()
case <-syncC:
r.s.sync(defaultSyncTimeout)
case <-r.stopped:
return
} }
} }()
} }
func (r *raftNode) apply() chan apply { func (r *raftNode) apply() chan apply {
@ -176,6 +186,11 @@ func (r *raftNode) apply() chan apply {
} }
func (r *raftNode) stop() { func (r *raftNode) stop() {
r.stopped <- struct{}{}
<-r.done
}
func (r *raftNode) onStop() {
r.Stop() r.Stop()
r.transport.Stop() r.transport.Stop()
if err := r.storage.Close(); err != nil { if err := r.storage.Close(); err != nil {

View File

@ -148,15 +148,11 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newReadyNode() n := newReadyNode()
r := raftNode{ r := raftNode{
Node: n, Node: n,
applyc: make(chan apply),
storage: &storageRecorder{}, storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(), raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{}, transport: &nopTransporter{},
stopped: make(chan struct{}),
done: make(chan struct{}),
} }
r.s = &EtcdServer{r: r} r.start(&EtcdServer{r: r})
go r.run()
n.readyc <- raft.Ready{} n.readyc <- raft.Ready{}
select { select {
case <-r.applyc: case <-r.applyc:

View File

@ -413,15 +413,9 @@ func (s *EtcdServer) run() {
confState := snap.Metadata.ConfState confState := snap.Metadata.ConfState
snapi := snap.Metadata.Index snapi := snap.Metadata.Index
appliedi := snapi appliedi := snapi
// TODO: get rid of the raft initialization in etcd server s.r.start(s)
s.r.s = s
s.r.applyc = make(chan apply)
s.r.stopped = make(chan struct{})
s.r.done = make(chan struct{})
go s.r.run()
defer func() { defer func() {
s.r.stopped <- struct{}{} s.r.stop()
<-s.r.done
close(s.done) close(s.done)
}() }()