Merge pull request #771 from unihorn/80

refactor(peer_server): remove standby mode in peer server
This commit is contained in:
Yicheng Qin
2014-05-07 09:57:02 -07:00
10 changed files with 140 additions and 627 deletions

View File

@@ -2,6 +2,7 @@ package server
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
@@ -31,8 +32,8 @@ const (
ThresholdMonitorTimeout = 5 * time.Second
// ActiveMonitorTimeout is the time between checks on the active size of
// the cluster. If the active size is different than the actual size then
// etcd attempts to promote/demote to bring it to the correct number.
// the cluster. If the active size is bigger than the actual size then
// etcd attempts to demote to bring it to the correct number.
ActiveMonitorTimeout = 1 * time.Second
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
@@ -40,11 +41,6 @@ const (
PeerActivityMonitorTimeout = 1 * time.Second
)
const (
peerModeFlag = 0
standbyModeFlag = 1
)
type PeerServerConfig struct {
Name string
Scheme string
@@ -65,15 +61,11 @@ type PeerServer struct {
registry *Registry
store store.Store
snapConf *snapshotConf
mode Mode
closeChan chan bool
routineGroup sync.WaitGroup
timeoutThresholdChan chan interface{}
standbyPeerURL string
standbyClientURL string
metrics *metrics.Bucket
sync.Mutex
}
@@ -129,29 +121,6 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}
// Mode retrieves the current mode of the server.
func (s *PeerServer) Mode() Mode {
return s.mode
}
// SetMode updates the current mode of the server.
// Switching to a peer mode will start the Raft server.
// Switching to a standby mode will stop the Raft server.
func (s *PeerServer) setMode(mode Mode) {
s.mode = mode
switch mode {
case PeerMode:
if !s.raftServer.Running() {
s.raftServer.Start()
}
case StandbyMode:
if s.raftServer.Running() {
s.raftServer.Stop()
}
}
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
return s.clusterConfig
@@ -328,7 +297,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
router.HandleFunc("/join", s.JoinHttpHandler)
router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST")
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
router.HandleFunc("/vote", s.VoteHttpHandler)
router.HandleFunc("/log", s.GetLogHttpHandler)
@@ -341,8 +309,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE")
return router
}
@@ -361,15 +327,14 @@ func (s *PeerServer) startAsLeader() {
s.raftServer.Start()
// leader need to join self as a peer
for {
c := &JoinCommandV1{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
_, err := s.raftServer.Do(c)
if err == nil {
if _, err := s.raftServer.Do(c); err == nil {
break
}
}
@@ -550,16 +515,16 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
}
var b bytes.Buffer
c := &JoinCommandV2{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
PeerURL: s.Config.URL,
ClientURL: s.server.URL(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()}
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
log.Infof("Send Join Request to %s", joinURL.String())
req, _ := http.NewRequest("PUT", joinURL.String(), &b)
@@ -574,30 +539,19 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
log.Infof("»»»» %d", resp.StatusCode)
if resp.StatusCode == http.StatusOK {
var msg joinMessageV2
if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil {
log.Debugf("Error reading join response: %v", err)
return err
}
s.joinIndex = msg.CommitIndex
s.setMode(msg.Mode)
if msg.Mode == StandbyMode {
s.standbyClientURL = resp.Header.Get("X-Leader-Client-URL")
s.standbyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
}
b, _ := ioutil.ReadAll(resp.Body)
s.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
c := &JoinCommandV2{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
PeerURL: s.Config.URL,
ClientURL: s.server.URL(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
resp, _, err = t.Put(address, &b)
@@ -783,49 +737,21 @@ func (s *PeerServer) monitorActiveSize() {
// Retrieve target active size and actual active size.
activeSize := s.ClusterConfig().ActiveSize
peerCount := s.registry.PeerCount()
standbys := s.registry.Standbys()
peers := s.registry.Peers()
peerCount := s.registry.Count()
peers := s.registry.Names()
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...)
}
// If we have more active nodes than we should then demote.
// If we have more active nodes than we should then remove.
if peerCount > activeSize {
peer := peers[rand.Intn(len(peers))]
log.Infof("%s: demoting: %v", s.Config.Name, peer)
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil {
log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
log.Infof("%s: removing: %v", s.Config.Name, peer)
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
log.Infof("%s: warning: remove error: %v", s.Config.Name, err)
}
continue
}
// If we don't have enough active nodes then try to promote a standby.
if peerCount < activeSize && len(standbys) > 0 {
loop:
for _, i := range rand.Perm(len(standbys)) {
standby := standbys[i]
standbyPeerURL, _ := s.registry.StandbyPeerURL(standby)
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, standby, standbyPeerURL)
// Notify standby to promote itself.
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
ResponseHeaderTimeout: ActiveMonitorTimeout,
},
}
resp, err := client.Post(fmt.Sprintf("%s/promote", standbyPeerURL), "application/json", nil)
if err != nil {
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
continue
} else if resp.StatusCode != http.StatusOK {
log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode)
continue
}
break loop
}
}
}
}
@@ -853,8 +779,8 @@ func (s *PeerServer) monitorPeerActivity() {
// If the last response from the peer is longer than the promote delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil {
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
}
continue
@@ -862,15 +788,3 @@ func (s *PeerServer) monitorPeerActivity() {
}
}
}
// Mode represents whether the server is an active peer or if the server is
// simply acting as a standby.
type Mode string
const (
// PeerMode is when the server is an active node in Raft.
PeerMode = Mode("peer")
// StandbyMode is when the server is an inactive, request-forwarding node.
StandbyMode = Mode("standby")
)