Minor fixes to proxies.

This commit is contained in:
Ben Johnson
2014-03-07 07:38:40 -07:00
parent 3fff1a8dcd
commit c91688315a
2 changed files with 30 additions and 13 deletions

View File

@@ -20,6 +20,7 @@ func init() {
// 8 bytes | 1 byte
// join_index | join_mode
//
// This binary protocol is for backward compatibility.
type JoinCommand struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
@@ -57,7 +58,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
// Check if the join command is from a previous peer, who lost all its previous log.
if _, ok := ps.registry.ClientURL(c.Name); ok {
binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer.
binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer.
return buf.Bytes(), nil
}
@@ -65,7 +66,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
log.Debug("Join as proxy ", c.Name)
ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL)
binary.Write(&buf, binary.BigEndian, uint8(1)) // Mark as proxy.
binary.Write(&buf, binary.BigEndian, uint8(proxyModeFlag)) // Mark as proxy.
return buf.Bytes(), nil
}
@@ -86,7 +87,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer.
binary.Write(&buf, binary.BigEndian, uint8(peerModeFlag)) // Mark as peer.
return buf.Bytes(), err
}

View File

@@ -25,9 +25,25 @@ import (
"github.com/coreos/etcd/store"
)
const ThresholdMonitorTimeout = 5 * time.Second
const ActiveMonitorTimeout = 1 * time.Second
const PeerActivityMonitorTimeout = 1 * time.Second
const (
// ThresholdMonitorTimeout is the time between log notifications that the
// Raft heartbeat is too close to the election timeout.
ThresholdMonitorTimeout = 5 * time.Second
// ActiveMonitorTimeout is the time between checks on the active size of
// the cluster. If the active size is different than the actual size then
// etcd attempts to promote/demote to bring it to the correct number.
ActiveMonitorTimeout = 1 * time.Second
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
// the cluster.
PeerActivityMonitorTimeout = 1 * time.Second
)
const (
peerModeFlag = 0
proxyModeFlag = 1
)
type PeerServerConfig struct {
Name string
@@ -268,7 +284,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
go s.monitorSync()
go s.monitorTimeoutThreshold(s.closeChan)
go s.monitorActive(s.closeChan)
go s.monitorActiveSize(s.closeChan)
go s.monitorPeerActivity(s.closeChan)
// open the snapshot
@@ -453,16 +469,16 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
// Determine whether the server joined as a proxy or peer.
var mode uint64
if mode, err = binary.ReadUvarint(r); err == io.EOF {
mode = 0
mode = peerModeFlag
} else if err != nil {
log.Debugf("Error reading join mode: %v", err)
return err
}
switch mode {
case 0:
case peerModeFlag:
s.setMode(PeerMode)
case 1:
case proxyModeFlag:
s.setMode(ProxyMode)
s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
@@ -617,9 +633,9 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
}
}
// monitorActive has the leader periodically check the status of cluster nodes
// and swaps them out for proxies as needed.
func (s *PeerServer) monitorActive(closeChan chan bool) {
// monitorActiveSize has the leader periodically check the status of cluster
// nodes and swaps them out for proxies as needed.
func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
for {
select {
case <-time.After(ActiveMonitorTimeout):