From c91688315a9abcd8ab84a63ab58185d680a5fabb Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 7 Mar 2014 07:38:40 -0700 Subject: [PATCH] Minor fixes to proxies. --- server/join_command.go | 7 ++++--- server/peer_server.go | 36 ++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/server/join_command.go b/server/join_command.go index 871faedf8..44ffc23ac 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -20,6 +20,7 @@ func init() { // 8 bytes | 1 byte // join_index | join_mode // +// This binary protocol is for backward compatibility. type JoinCommand struct { MinVersion int `json:"minVersion"` MaxVersion int `json:"maxVersion"` @@ -57,7 +58,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { // Check if the join command is from a previous peer, who lost all its previous log. if _, ok := ps.registry.ClientURL(c.Name); ok { - binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer. + binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer. return buf.Bytes(), nil } @@ -65,7 +66,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize { log.Debug("Join as proxy ", c.Name) ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL) - binary.Write(&buf, binary.BigEndian, uint8(1)) // Mark as proxy. + binary.Write(&buf, binary.BigEndian, uint8(proxyModeFlag)) // Mark as proxy. return buf.Bytes(), nil } @@ -86,7 +87,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } - binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer. + binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer. return buf.Bytes(), err } diff --git a/server/peer_server.go b/server/peer_server.go index 949151c3e..5cff2cdfa 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -25,9 +25,25 @@ import ( "github.com/coreos/etcd/store" ) -const ThresholdMonitorTimeout = 5 * time.Second -const ActiveMonitorTimeout = 1 * time.Second -const PeerActivityMonitorTimeout = 1 * time.Second +const ( + // ThresholdMonitorTimeout is the time between log notifications that the + // Raft heartbeat is too close to the election timeout. + ThresholdMonitorTimeout = 5 * time.Second + + // ActiveMonitorTimeout is the time between checks on the active size of + // the cluster. If the active size is different than the actual size then + // etcd attempts to promote/demote to bring it to the correct number. + ActiveMonitorTimeout = 1 * time.Second + + // PeerActivityMonitorTimeout is the time between checks for dead nodes in + // the cluster. + PeerActivityMonitorTimeout = 1 * time.Second +) + +const ( + peerModeFlag = 0 + proxyModeFlag = 1 +) type PeerServerConfig struct { Name string @@ -268,7 +284,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er go s.monitorSync() go s.monitorTimeoutThreshold(s.closeChan) - go s.monitorActive(s.closeChan) + go s.monitorActiveSize(s.closeChan) go s.monitorPeerActivity(s.closeChan) // open the snapshot @@ -453,16 +469,16 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Determine whether the server joined as a proxy or peer. var mode uint64 if mode, err = binary.ReadUvarint(r); err == io.EOF { - mode = 0 + mode = peerModeFlag } else if err != nil { log.Debugf("Error reading join mode: %v", err) return err } switch mode { - case 0: + case peerModeFlag: s.setMode(PeerMode) - case 1: + case proxyModeFlag: s.setMode(ProxyMode) s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL") s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL") @@ -617,9 +633,9 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { } } -// monitorActive has the leader periodically check the status of cluster nodes -// and swaps them out for proxies as needed. -func (s *PeerServer) monitorActive(closeChan chan bool) { +// monitorActiveSize has the leader periodically check the status of cluster +// nodes and swaps them out for proxies as needed. +func (s *PeerServer) monitorActiveSize(closeChan chan bool) { for { select { case <-time.After(ActiveMonitorTimeout):