From e55512f60bca19b204ae69847e5ff0cf1df9b507 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 2 Apr 2014 10:46:31 -0700 Subject: [PATCH 1/2] fix(peer_server): graceful stop for peer server run Peer server will be started and stopped repeatedly in the design. This step ensures its stop doesn't affect the next start. The patch includes goroutine stop and timer trigger remove. --- discovery/discovery.go | 17 +++++----- server/peer_server.go | 70 ++++++++++++++++++++++++++++++------------ 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 3aec8503a..aa833acbe 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -33,7 +33,7 @@ func init() { defaultDiscoverer = &Discoverer{} } -func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) { +func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) (peers []string, err error) { d.name = name d.peer = peer d.discoveryURL = discoveryURL @@ -68,7 +68,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers [] // Start the very slow heartbeat to the cluster now in anticipation // that everything is going to go alright now - go d.startHeartbeat() + daemon(func() { d.startHeartbeat(closeChan) }) // Attempt to take the leadership role, if there is no error we are it! resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0) @@ -120,17 +120,20 @@ func (d *Discoverer) findPeers() (peers []string, err error) { return } -func (d *Discoverer) startHeartbeat() { +func (d *Discoverer) startHeartbeat(closeChan <-chan bool) { // In case of errors we should attempt to heartbeat fairly frequently heartbeatInterval := defaultTTL / 8 - ticker := time.Tick(time.Second * time.Duration(heartbeatInterval)) + ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval)) + defer ticker.Stop() for { select { - case <-ticker: + case <-ticker.C: err := d.heartbeat() if err != nil { log.Warnf("Discovery heartbeat failed: %v", err) } + case <-closeChan: + return } } } @@ -140,6 +143,6 @@ func (d *Discoverer) heartbeat() error { return err } -func Do(discoveryURL string, name string, peer string) ([]string, error) { - return defaultDiscoverer.Do(discoveryURL, name, peer) +func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) ([]string, error) { + return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, daemon) } diff --git a/server/peer_server.go b/server/peer_server.go index dcf7cc575..b82235b77 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -67,6 +67,7 @@ type PeerServer struct { mode Mode closeChan chan bool + routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} standbyPeerURL string @@ -286,14 +287,14 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.closeChan = make(chan bool) - go s.monitorSync() - go s.monitorTimeoutThreshold(s.closeChan) - go s.monitorActiveSize(s.closeChan) - go s.monitorPeerActivity(s.closeChan) + s.daemon(s.monitorSync) + s.daemon(s.monitorTimeoutThreshold) + s.daemon(s.monitorActiveSize) + s.daemon(s.monitorPeerActivity) // open the snapshot if snapshot { - go s.monitorSnapshot() + s.daemon(s.monitorSnapshot) } return nil @@ -305,9 +306,10 @@ func (s *PeerServer) Stop() { if s.closeChan != nil { close(s.closeChan) - s.closeChan = nil } s.raftServer.Stop() + s.routineGroup.Wait() + s.closeChan = nil } func (s *PeerServer) HTTPHandler() http.Handler { @@ -428,7 +430,7 @@ func (s *PeerServer) Upgradable() error { // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { - peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) + peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.daemon) // Warn about errors coming from discovery, this isn't fatal // since the user might have provided a peer list elsewhere, @@ -670,9 +672,24 @@ func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) { } } +func (s *PeerServer) daemon(f func()) { + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + f() + }() +} + func (s *PeerServer) monitorSnapshot() { for { - time.Sleep(s.snapConf.checkingInterval) + timer := time.NewTimer(s.snapConf.checkingInterval) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } + currentIndex := s.RaftServer().CommitIndex() count := currentIndex - s.snapConf.lastIndex if uint64(count) > s.snapConf.snapshotThr { @@ -684,10 +701,13 @@ func (s *PeerServer) monitorSnapshot() { } func (s *PeerServer) monitorSync() { - ticker := time.Tick(time.Millisecond * 500) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() for { select { - case now := <-ticker: + case <-s.closeChan: + return + case now := <-ticker.C: if s.raftServer.State() == raft.Leader { s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now)) } @@ -697,27 +717,35 @@ func (s *PeerServer) monitorSync() { // monitorTimeoutThreshold groups timeout threshold events together and prints // them as a single log line. -func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { +func (s *PeerServer) monitorTimeoutThreshold() { for { select { + case <-s.closeChan: + return case value := <-s.timeoutThresholdChan: log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) - case <-closeChan: - return } - time.Sleep(ThresholdMonitorTimeout) + timer := time.NewTimer(ThresholdMonitorTimeout) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } } } // monitorActiveSize has the leader periodically check the status of cluster // nodes and swaps them out for standbys as needed. -func (s *PeerServer) monitorActiveSize(closeChan chan bool) { +func (s *PeerServer) monitorActiveSize() { for { + timer := time.NewTimer(ActiveMonitorTimeout) + defer timer.Stop() select { - case <-time.After(ActiveMonitorTimeout): - case <-closeChan: + case <-s.closeChan: return + case <-timer.C: } // Ignore while this peer is not a leader. @@ -774,12 +802,14 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) { } // monitorPeerActivity has the leader periodically for dead nodes and demotes them. -func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { +func (s *PeerServer) monitorPeerActivity() { for { + timer := time.NewTimer(PeerActivityMonitorTimeout) + defer timer.Stop() select { - case <-time.After(PeerActivityMonitorTimeout): - case <-closeChan: + case <-s.closeChan: return + case <-timer.C: } // Ignore while this peer is not a leader. From 6516cf854c2a64a08016eb4c87a31d0b61e11d2a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 07:51:44 -0700 Subject: [PATCH 2/2] chore(server): rename daemon to startRoutine For better understanding. --- discovery/discovery.go | 8 ++++---- server/peer_server.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index aa833acbe..19ced3195 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -33,7 +33,7 @@ func init() { defaultDiscoverer = &Discoverer{} } -func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) (peers []string, err error) { +func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) (peers []string, err error) { d.name = name d.peer = peer d.discoveryURL = discoveryURL @@ -68,7 +68,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan // Start the very slow heartbeat to the cluster now in anticipation // that everything is going to go alright now - daemon(func() { d.startHeartbeat(closeChan) }) + startRoutine(func() { d.startHeartbeat(closeChan) }) // Attempt to take the leadership role, if there is no error we are it! resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0) @@ -143,6 +143,6 @@ func (d *Discoverer) heartbeat() error { return err } -func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) ([]string, error) { - return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, daemon) +func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) ([]string, error) { + return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, startRoutine) } diff --git a/server/peer_server.go b/server/peer_server.go index b82235b77..fe738c244 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -287,14 +287,14 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.closeChan = make(chan bool) - s.daemon(s.monitorSync) - s.daemon(s.monitorTimeoutThreshold) - s.daemon(s.monitorActiveSize) - s.daemon(s.monitorPeerActivity) + s.startRoutine(s.monitorSync) + s.startRoutine(s.monitorTimeoutThreshold) + s.startRoutine(s.monitorActiveSize) + s.startRoutine(s.monitorPeerActivity) // open the snapshot if snapshot { - s.daemon(s.monitorSnapshot) + s.startRoutine(s.monitorSnapshot) } return nil @@ -430,7 +430,7 @@ func (s *PeerServer) Upgradable() error { // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { - peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.daemon) + peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine) // Warn about errors coming from discovery, this isn't fatal // since the user might have provided a peer list elsewhere, @@ -672,7 +672,7 @@ func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) { } } -func (s *PeerServer) daemon(f func()) { +func (s *PeerServer) startRoutine(f func()) { s.routineGroup.Add(1) go func() { defer s.routineGroup.Done()