diff --git a/discovery/discovery.go b/discovery/discovery.go index 3aec8503a..aa833acbe 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -33,7 +33,7 @@ func init() { defaultDiscoverer = &Discoverer{} } -func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) { +func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) (peers []string, err error) { d.name = name d.peer = peer d.discoveryURL = discoveryURL @@ -68,7 +68,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers [] // Start the very slow heartbeat to the cluster now in anticipation // that everything is going to go alright now - go d.startHeartbeat() + daemon(func() { d.startHeartbeat(closeChan) }) // Attempt to take the leadership role, if there is no error we are it! resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0) @@ -120,17 +120,20 @@ func (d *Discoverer) findPeers() (peers []string, err error) { return } -func (d *Discoverer) startHeartbeat() { +func (d *Discoverer) startHeartbeat(closeChan <-chan bool) { // In case of errors we should attempt to heartbeat fairly frequently heartbeatInterval := defaultTTL / 8 - ticker := time.Tick(time.Second * time.Duration(heartbeatInterval)) + ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval)) + defer ticker.Stop() for { select { - case <-ticker: + case <-ticker.C: err := d.heartbeat() if err != nil { log.Warnf("Discovery heartbeat failed: %v", err) } + case <-closeChan: + return } } } @@ -140,6 +143,6 @@ func (d *Discoverer) heartbeat() error { return err } -func Do(discoveryURL string, name string, peer string) ([]string, error) { - return defaultDiscoverer.Do(discoveryURL, name, peer) +func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, daemon func(func())) ([]string, error) { + return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, daemon) } diff --git a/server/peer_server.go b/server/peer_server.go index dcf7cc575..b82235b77 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -67,6 +67,7 @@ type PeerServer struct { mode Mode closeChan chan bool + routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} standbyPeerURL string @@ -286,14 +287,14 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.closeChan = make(chan bool) - go s.monitorSync() - go s.monitorTimeoutThreshold(s.closeChan) - go s.monitorActiveSize(s.closeChan) - go s.monitorPeerActivity(s.closeChan) + s.daemon(s.monitorSync) + s.daemon(s.monitorTimeoutThreshold) + s.daemon(s.monitorActiveSize) + s.daemon(s.monitorPeerActivity) // open the snapshot if snapshot { - go s.monitorSnapshot() + s.daemon(s.monitorSnapshot) } return nil @@ -305,9 +306,10 @@ func (s *PeerServer) Stop() { if s.closeChan != nil { close(s.closeChan) - s.closeChan = nil } s.raftServer.Stop() + s.routineGroup.Wait() + s.closeChan = nil } func (s *PeerServer) HTTPHandler() http.Handler { @@ -428,7 +430,7 @@ func (s *PeerServer) Upgradable() error { // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { - peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) + peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.daemon) // Warn about errors coming from discovery, this isn't fatal // since the user might have provided a peer list elsewhere, @@ -670,9 +672,24 @@ func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) { } } +func (s *PeerServer) daemon(f func()) { + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + f() + }() +} + func (s *PeerServer) monitorSnapshot() { for { - time.Sleep(s.snapConf.checkingInterval) + timer := time.NewTimer(s.snapConf.checkingInterval) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } + currentIndex := s.RaftServer().CommitIndex() count := currentIndex - s.snapConf.lastIndex if uint64(count) > s.snapConf.snapshotThr { @@ -684,10 +701,13 @@ func (s *PeerServer) monitorSnapshot() { } func (s *PeerServer) monitorSync() { - ticker := time.Tick(time.Millisecond * 500) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() for { select { - case now := <-ticker: + case <-s.closeChan: + return + case now := <-ticker.C: if s.raftServer.State() == raft.Leader { s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now)) } @@ -697,27 +717,35 @@ func (s *PeerServer) monitorSync() { // monitorTimeoutThreshold groups timeout threshold events together and prints // them as a single log line. -func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { +func (s *PeerServer) monitorTimeoutThreshold() { for { select { + case <-s.closeChan: + return case value := <-s.timeoutThresholdChan: log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) - case <-closeChan: - return } - time.Sleep(ThresholdMonitorTimeout) + timer := time.NewTimer(ThresholdMonitorTimeout) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } } } // monitorActiveSize has the leader periodically check the status of cluster // nodes and swaps them out for standbys as needed. -func (s *PeerServer) monitorActiveSize(closeChan chan bool) { +func (s *PeerServer) monitorActiveSize() { for { + timer := time.NewTimer(ActiveMonitorTimeout) + defer timer.Stop() select { - case <-time.After(ActiveMonitorTimeout): - case <-closeChan: + case <-s.closeChan: return + case <-timer.C: } // Ignore while this peer is not a leader. @@ -774,12 +802,14 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) { } // monitorPeerActivity has the leader periodically for dead nodes and demotes them. -func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { +func (s *PeerServer) monitorPeerActivity() { for { + timer := time.NewTimer(PeerActivityMonitorTimeout) + defer timer.Stop() select { - case <-time.After(PeerActivityMonitorTimeout): - case <-closeChan: + case <-s.closeChan: return + case <-timer.C: } // Ignore while this peer is not a leader.