diff --git a/server/peer_server.go b/server/peer_server.go index 2948da2a9..8870eb40b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -42,6 +42,9 @@ type PeerServer struct { RetryTimes int HeartbeatTimeout time.Duration ElectionTimeout time.Duration + + closeChan chan bool + timeoutThresholdChan chan interface{} } // TODO: find a good policy to do snapshot @@ -82,6 +85,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon }, HeartbeatTimeout: defaultHeartbeatTimeout, ElectionTimeout: defaultElectionTimeout, + + timeoutThresholdChan: make(chan interface{}, 1), } // Create transporter for raft @@ -99,6 +104,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) + s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger) + s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) return s } @@ -147,7 +154,10 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { log.Debugf("%s restart as a follower", s.name) } + s.closeChan = make(chan bool) + go s.monitorSync() + go s.monitorTimeoutThreshold(s.closeChan) // open the snapshot if snapshot { @@ -205,6 +215,10 @@ func (s *PeerServer) listenAndServeTLS(certFile, keyFile string) error { // Stops the server. func (s *PeerServer) Close() { + if s.closeChan != nil { + close(s.closeChan) + s.closeChan = nil + } if s.listener != nil { s.listener.Close() s.listener = nil @@ -449,6 +463,18 @@ func (s *PeerServer) raftEventLogger(event raft.Event) { log.Infof("%s: peer added: '%v'", s.name, value) case raft.RemovePeerEventType: log.Infof("%s: peer removed: '%v'", s.name, value) + case raft.HeartbeatTimeoutEventType: + var name = "" + if peer, ok := value.(*raft.Peer); ok { + name = peer.Name + } + log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name) + case raft.ElectionTimeoutThresholdEventType: + select { + case s.timeoutThresholdChan <- value: + default: + } + } } @@ -474,3 +500,18 @@ func (s *PeerServer) monitorSync() { } } } + +// monitorTimeoutThreshold groups timeout threshold events together and prints +// them as a single log line. +func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { + for { + select { + case value := <-s.timeoutThresholdChan: + log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value) + case <-closeChan: + return + } + + time.Sleep(5 * time.Second) + } +}