From 7696dd32806c3bdf6f801826fe40a7b9264f6988 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 10 Jul 2015 16:31:22 -0700 Subject: [PATCH] etcdserver: clean up start and stop logic of raft kill TODO and make it more readable. --- etcdserver/raft.go | 119 ++++++++++++++++++++++------------------ etcdserver/raft_test.go | 6 +- etcdserver/server.go | 10 +--- 3 files changed, 70 insertions(+), 65 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index f6d2da30d..162d9657c 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -108,67 +108,77 @@ type raftNode struct { done chan struct{} } -func (r *raftNode) run() { - var syncC <-chan time.Time +// start prepares and starts raftNode in a new goroutine. It is no longer safe +// to modify the fields after it has been started. +// TODO: Ideally raftNode should get rid of the passed in server structure. +func (r *raftNode) start(s *EtcdServer) { + r.s = s + r.applyc = make(chan apply) + r.stopped = make(chan struct{}) + r.done = make(chan struct{}) - defer r.stop() - for { - select { - case <-r.ticker: - r.Tick() - case rd := <-r.Ready(): - if rd.SoftState != nil { - atomic.StoreUint64(&r.lead, rd.SoftState.Lead) - if rd.RaftState == raft.StateLeader { - syncC = r.s.SyncTicker - // TODO: remove the nil checking - // current test utility does not provide the stats - if r.s.stats != nil { - r.s.stats.BecomeLeader() + go func() { + var syncC <-chan time.Time + + defer r.onStop() + for { + select { + case <-r.ticker: + r.Tick() + case rd := <-r.Ready(): + if rd.SoftState != nil { + atomic.StoreUint64(&r.lead, rd.SoftState.Lead) + if rd.RaftState == raft.StateLeader { + syncC = r.s.SyncTicker + // TODO: remove the nil checking + // current test utility does not provide the stats + if r.s.stats != nil { + r.s.stats.BecomeLeader() + } + } else { + syncC = nil } - } else { - syncC = nil } - } - apply := apply{ - entries: rd.CommittedEntries, - snapshot: rd.Snapshot, - done: make(chan struct{}), - } + apply := apply{ + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + done: make(chan struct{}), + } - select { - case r.applyc <- apply: + select { + case r.applyc <- apply: + case <-r.stopped: + return + } + + if !raft.IsEmptySnap(rd.Snapshot) { + if err := r.storage.SaveSnap(rd.Snapshot); err != nil { + plog.Fatalf("raft save snapshot error: %v", err) + } + r.raftStorage.ApplySnapshot(rd.Snapshot) + plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) + } + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + plog.Fatalf("raft save state and entries error: %v", err) + } + r.raftStorage.Append(rd.Entries) + + r.s.send(rd.Messages) + + select { + case <-apply.done: + case <-r.stopped: + return + } + r.Advance() + case <-syncC: + r.s.sync(defaultSyncTimeout) case <-r.stopped: return } - - if !raft.IsEmptySnap(rd.Snapshot) { - if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - plog.Fatalf("raft save snapshot error: %v", err) - } - r.raftStorage.ApplySnapshot(rd.Snapshot) - plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) - } - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - plog.Fatalf("raft save state and entries error: %v", err) - } - r.raftStorage.Append(rd.Entries) - - r.s.send(rd.Messages) - - select { - case <-apply.done: - case <-r.stopped: - return - } - r.Advance() - case <-syncC: - r.s.sync(defaultSyncTimeout) - case <-r.stopped: - return } - } + }() } func (r *raftNode) apply() chan apply { @@ -176,6 +186,11 @@ func (r *raftNode) apply() chan apply { } func (r *raftNode) stop() { + r.stopped <- struct{}{} + <-r.done +} + +func (r *raftNode) onStop() { r.Stop() r.transport.Stop() if err := r.storage.Close(); err != nil { diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 989db37bf..6d9b45d3d 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -148,15 +148,11 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { n := newReadyNode() r := raftNode{ Node: n, - applyc: make(chan apply), storage: &storageRecorder{}, raftStorage: raft.NewMemoryStorage(), transport: &nopTransporter{}, - stopped: make(chan struct{}), - done: make(chan struct{}), } - r.s = &EtcdServer{r: r} - go r.run() + r.start(&EtcdServer{r: r}) n.readyc <- raft.Ready{} select { case <-r.applyc: diff --git a/etcdserver/server.go b/etcdserver/server.go index 149f65724..eb4c622f3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -413,15 +413,9 @@ func (s *EtcdServer) run() { confState := snap.Metadata.ConfState snapi := snap.Metadata.Index appliedi := snapi - // TODO: get rid of the raft initialization in etcd server - s.r.s = s - s.r.applyc = make(chan apply) - s.r.stopped = make(chan struct{}) - s.r.done = make(chan struct{}) - go s.r.run() + s.r.start(s) defer func() { - s.r.stopped <- struct{}{} - <-s.r.done + s.r.stop() close(s.done) }()