refactor(peer_server): remove standby mode in peer server

This commit is contained in:
Yicheng Qin 2014-04-30 14:22:05 -07:00
parent 43ad623965
commit 17e299995c
10 changed files with 140 additions and 627 deletions

View File

@ -1,70 +0,0 @@
package server
import (
"fmt"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {
raft.RegisterCommand(&DemoteCommand{})
}
// DemoteCommand represents a command to change a peer to a standby.
type DemoteCommand struct {
Name string `json:"name"`
}
// CommandName returns the name of the command.
func (c *DemoteCommand) CommandName() string {
return "etcd:demote"
}
// Apply executes the command.
func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
// Ignore this command if there is no peer.
if !ps.registry.PeerExists(c.Name) {
return nil, fmt.Errorf("peer does not exist: %s", c.Name)
}
// Save URLs.
clientURL, _ := ps.registry.ClientURL(c.Name)
peerURL, _ := ps.registry.PeerURL(c.Name)
// Remove node from the shared registry.
err := ps.registry.UnregisterPeer(c.Name)
if err != nil {
log.Debugf("Demote peer %s: Error while unregistering (%v)", c.Name, err)
return nil, err
}
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
// Remove peer in raft
err = context.Server().RemovePeer(c.Name)
if err != nil {
log.Debugf("Demote peer %s: (%v)", c.Name, err)
return nil, err
}
// Register node as a standby.
ps.registry.RegisterStandby(c.Name, peerURL, clientURL)
// Update mode if this change applies to this server.
if c.Name == ps.Config.Name {
log.Infof("Demote peer %s: Set mode to standby with %s", c.Name, ps.server.Leader())
ps.standbyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
go ps.setMode(StandbyMode)
}
return nil, nil
}
// NodeName returns the name of the affected node.
func (c *DemoteCommand) NodeName() string {
return c.Name
}

View File

@ -2,7 +2,6 @@ package server
import (
"encoding/binary"
"encoding/json"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
@ -10,13 +9,12 @@ import (
)
func init() {
raft.RegisterCommand(&JoinCommandV1{})
raft.RegisterCommand(&JoinCommandV2{})
raft.RegisterCommand(&JoinCommand{})
}
// JoinCommandV1 represents a request to join the cluster.
// JoinCommand represents a request to join the cluster.
// The command returns the join_index (Uvarint).
type JoinCommandV1 struct {
type JoinCommand struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
@ -25,27 +23,30 @@ type JoinCommandV1 struct {
}
// The name of the join command in the log
func (c *JoinCommandV1) CommandName() string {
func (c *JoinCommand) CommandName() string {
return "etcd:join"
}
func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
return err
// Apply attempts to join a machine to the cluster.
func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
index, err := applyJoin(c, context)
if err != nil {
return nil, err
}
// Flush commit index, so raft will replay to here when restarted
ps.raftServer.FlushCommitIndex()
return nil
}
// Join a server to the cluster
func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
b := make([]byte, 8)
binary.PutUvarint(b, context.CommitIndex())
binary.PutUvarint(b, index)
return b, nil
}
func (c *JoinCommand) NodeName() string {
return c.Name
}
// applyJoin attempts to join a machine to the cluster.
func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
ps, _ := context.Server().Context().(*PeerServer)
commitIndex := context.CommitIndex()
// Make sure we're not getting a cached value from the registry.
ps.registry.Invalidate(c.Name)
@ -56,11 +57,11 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
// update its information.
if peerURL != c.RaftURL {
log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
if err := c.updatePeerURL(ps); err != nil {
return []byte{0}, err
if err := updatePeerURL(c, ps); err != nil {
return 0, err
}
}
return b, nil
return commitIndex, nil
}
// Check if the join command adds an instance that collides with existing one on peer URL.
@ -68,21 +69,23 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
for _, peerURL := range peerURLs {
if peerURL == c.RaftURL {
log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex())
return 0, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex())
}
}
// Check peer number in the cluster
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
if ps.registry.Count() >= ps.ClusterConfig().ActiveSize {
log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
}
// Add to shared peer registry.
ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)
// Add peer in raft
err := context.Server().AddPeer(c.Name, "")
if err := context.Server().AddPeer(c.Name, ""); err != nil {
return 0, err
}
// Add peer stats
if c.Name != ps.RaftServer().Name() {
@ -90,30 +93,12 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
return b, err
return commitIndex, nil
}
func (c *JoinCommandV1) NodeName() string {
return c.Name
}
// JoinCommandV2 represents a request to join the cluster.
type JoinCommandV2 struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
PeerURL string `json:"peerURL"`
ClientURL string `json:"clientURL"`
}
// CommandName returns the name of the command in the Raft log.
func (c *JoinCommandV2) CommandName() string {
return "etcd:v2:join"
}
func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil {
func updatePeerURL(c *JoinCommand, ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
return err
}
@ -121,76 +106,3 @@ func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
ps.raftServer.FlushCommitIndex()
return nil
}
// Apply attempts to join a machine to the cluster.
func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
var msg = joinMessageV2{
Mode: PeerMode,
CommitIndex: context.CommitIndex(),
}
// Make sure we're not getting a cached value from the registry.
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
// If previous node restarts with different peer URL,
// update its information.
if peerURL != c.PeerURL {
log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name)
if err := c.updatePeerURL(ps); err != nil {
return []byte{0}, err
}
}
return json.Marshal(msg)
}
// Check if the join command adds an instance that collides with existing one on peer URL.
peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
for _, peerURL := range peerURLs {
if peerURL == c.PeerURL {
log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.PeerURL)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.PeerURL, context.CommitIndex())
}
}
// Check peer number in the cluster.
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
log.Debug("Join as standby ", c.Name)
ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL)
msg.Mode = StandbyMode
return json.Marshal(msg)
}
// Remove it as a standby if it is one.
if ps.registry.StandbyExists(c.Name) {
ps.registry.UnregisterStandby(c.Name)
}
// Add to shared peer registry.
ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL)
// Add peer in raft
if err := context.Server().AddPeer(c.Name, ""); err != nil {
b, _ := json.Marshal(msg)
return b, err
}
// Add peer stats
if c.Name != ps.RaftServer().Name() {
ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
return json.Marshal(msg)
}
func (c *JoinCommandV2) NodeName() string {
return c.Name
}
type joinMessageV2 struct {
CommitIndex uint64 `json:"commitIndex"`
Mode Mode `json:"mode"`
}

View File

@ -2,6 +2,7 @@ package server
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
@ -31,8 +32,8 @@ const (
ThresholdMonitorTimeout = 5 * time.Second
// ActiveMonitorTimeout is the time between checks on the active size of
// the cluster. If the active size is different than the actual size then
// etcd attempts to promote/demote to bring it to the correct number.
// the cluster. If the active size is bigger than the actual size then
// etcd attempts to demote to bring it to the correct number.
ActiveMonitorTimeout = 1 * time.Second
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
@ -40,11 +41,6 @@ const (
PeerActivityMonitorTimeout = 1 * time.Second
)
const (
peerModeFlag = 0
standbyModeFlag = 1
)
type PeerServerConfig struct {
Name string
Scheme string
@ -65,14 +61,10 @@ type PeerServer struct {
registry *Registry
store store.Store
snapConf *snapshotConf
mode Mode
closeChan chan bool
timeoutThresholdChan chan interface{}
standbyPeerURL string
standbyClientURL string
metrics *metrics.Bucket
sync.Mutex
}
@ -128,29 +120,6 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}
// Mode retrieves the current mode of the server.
func (s *PeerServer) Mode() Mode {
return s.mode
}
// SetMode updates the current mode of the server.
// Switching to a peer mode will start the Raft server.
// Switching to a standby mode will stop the Raft server.
func (s *PeerServer) setMode(mode Mode) {
s.mode = mode
switch mode {
case PeerMode:
if !s.raftServer.Running() {
s.raftServer.Start()
}
case StandbyMode:
if s.raftServer.Running() {
s.raftServer.Stop()
}
}
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
return s.clusterConfig
@ -326,7 +295,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
router.HandleFunc("/join", s.JoinHttpHandler)
router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST")
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
router.HandleFunc("/vote", s.VoteHttpHandler)
router.HandleFunc("/log", s.GetLogHttpHandler)
@ -339,8 +307,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE")
return router
}
@ -359,15 +325,14 @@ func (s *PeerServer) startAsLeader() {
s.raftServer.Start()
// leader need to join self as a peer
for {
c := &JoinCommandV1{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
_, err := s.raftServer.Do(c)
if err == nil {
if _, err := s.raftServer.Do(c); err == nil {
break
}
}
@ -548,16 +513,16 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
}
var b bytes.Buffer
c := &JoinCommandV2{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
PeerURL: s.Config.URL,
ClientURL: s.server.URL(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()}
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
log.Infof("Send Join Request to %s", joinURL.String())
req, _ := http.NewRequest("PUT", joinURL.String(), &b)
@ -572,30 +537,19 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
log.Infof("»»»» %d", resp.StatusCode)
if resp.StatusCode == http.StatusOK {
var msg joinMessageV2
if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil {
log.Debugf("Error reading join response: %v", err)
return err
}
s.joinIndex = msg.CommitIndex
s.setMode(msg.Mode)
if msg.Mode == StandbyMode {
s.standbyClientURL = resp.Header.Get("X-Leader-Client-URL")
s.standbyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
}
b, _ := ioutil.ReadAll(resp.Body)
s.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
c := &JoinCommandV2{
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
PeerURL: s.Config.URL,
ClientURL: s.server.URL(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
json.NewEncoder(&b).Encode(c)
resp, _, err = t.Put(address, &b)
@ -755,49 +709,21 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
// Retrieve target active size and actual active size.
activeSize := s.ClusterConfig().ActiveSize
peerCount := s.registry.PeerCount()
standbys := s.registry.Standbys()
peers := s.registry.Peers()
peerCount := s.registry.Count()
peers := s.registry.Names()
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...)
}
// If we have more active nodes than we should then demote.
// If we have more active nodes than we should then remove.
if peerCount > activeSize {
peer := peers[rand.Intn(len(peers))]
log.Infof("%s: demoting: %v", s.Config.Name, peer)
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil {
log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
log.Infof("%s: removing: %v", s.Config.Name, peer)
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
log.Infof("%s: warning: remove error: %v", s.Config.Name, err)
}
continue
}
// If we don't have enough active nodes then try to promote a standby.
if peerCount < activeSize && len(standbys) > 0 {
loop:
for _, i := range rand.Perm(len(standbys)) {
standby := standbys[i]
standbyPeerURL, _ := s.registry.StandbyPeerURL(standby)
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, standby, standbyPeerURL)
// Notify standby to promote itself.
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
ResponseHeaderTimeout: ActiveMonitorTimeout,
},
}
resp, err := client.Post(fmt.Sprintf("%s/promote", standbyPeerURL), "application/json", nil)
if err != nil {
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
continue
} else if resp.StatusCode != http.StatusOK {
log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode)
continue
}
break loop
}
}
}
}
@ -823,8 +749,8 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
// If the last response from the peer is longer than the promote delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil {
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
}
continue
@ -832,15 +758,3 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
}
}
}
// Mode represents whether the server is an active peer or if the server is
// simply acting as a standby.
type Mode string
const (
// PeerMode is when the server is an active node in Raft.
PeerMode = Mode("peer")
// StandbyMode is when the server is an inactive, request-forwarding node.
StandbyMode = Mode("standby")
)

View File

@ -3,7 +3,6 @@ package server
import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"time"
@ -150,7 +149,7 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques
// Response to the join request
func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommandV1{}
command := &JoinCommand{}
if err := uhttp.DecodeJsonRequest(req, command); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@ -170,25 +169,6 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request)
}
}
// Attempt to rejoin the cluster as a peer.
func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.standbyPeerURL)
url, err := url.Parse(ps.standbyPeerURL)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = ps.joinByPeer(ps.raftServer, url.Host, ps.Config.Scheme)
if err != nil {
log.Infof("%s error while promoting: %v", ps.Config.Name, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Infof("%s promoted in the cluster", ps.Config.Name)
w.WriteHeader(http.StatusOK)
}
// Response to remove request
func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" {
@ -197,7 +177,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
}
vars := mux.Vars(req)
command := &RemoveCommandV1{
command := &RemoveCommand{
Name: vars["name"],
}
@ -243,10 +223,7 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
// Retrieves a list of peers and standbys.
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
machines := make([]*machineMessage, 0)
for _, name := range ps.registry.Peers() {
machines = append(machines, ps.getMachineMessage(name))
}
for _, name := range ps.registry.Standbys() {
for _, name := range ps.registry.Names() {
machines = append(machines, ps.getMachineMessage(name))
}
json.NewEncoder(w).Encode(&machines)
@ -259,56 +236,17 @@ func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Req
}
func (ps *PeerServer) getMachineMessage(name string) *machineMessage {
if ps.registry.PeerExists(name) {
clientURL, _ := ps.registry.ClientURL(name)
peerURL, _ := ps.registry.PeerURL(name)
return &machineMessage{
Name: name,
Mode: PeerMode,
ClientURL: clientURL,
PeerURL: peerURL,
}
if !ps.registry.Exists(name) {
return nil
}
if ps.registry.StandbyExists(name) {
clientURL, _ := ps.registry.StandbyClientURL(name)
peerURL, _ := ps.registry.StandbyPeerURL(name)
return &machineMessage{
Name: name,
Mode: StandbyMode,
ClientURL: clientURL,
PeerURL: peerURL,
}
clientURL, _ := ps.registry.ClientURL(name)
peerURL, _ := ps.registry.PeerURL(name)
return &machineMessage{
Name: name,
ClientURL: clientURL,
PeerURL: peerURL,
}
return nil
}
// Adds a machine to the cluster.
func (ps *PeerServer) addMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
c := &JoinCommandV2{}
if err := uhttp.DecodeJsonRequest(req, c); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("Receive Join Request (v2) from %s", c.Name)
if err := ps.server.Dispatch(c, w, req); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
// Removes a machine from the cluster.
func (ps *PeerServer) removeMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
c := &RemoveCommandV2{Name: vars["name"]}
log.Debugf("[recv] Remove Request [%s]", c.Name)
ps.server.Dispatch(c, w, req)
}
// Response to the name request
@ -360,7 +298,6 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
// machineMessage represents information about a peer or standby in the registry.
type machineMessage struct {
Name string `json:"name"`
Mode Mode `json:"mode"`
ClientURL string `json:"clientURL"`
PeerURL string `json:"peerURL"`
}

View File

@ -14,17 +14,13 @@ import (
)
// The location of the peer URL data.
const RegistryPeerKey = "/_etcd/machines"
// The location of the standby URL data.
const RegistryStandbyKey = "/_etcd/standbys"
const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes.
type Registry struct {
sync.Mutex
store store.Store
peers map[string]*node
standbys map[string]*node
store store.Store
peers map[string]*node
}
// The internal storage format of the registry.
@ -37,14 +33,13 @@ type node struct {
// Creates a new Registry.
func NewRegistry(s store.Store) *Registry {
return &Registry{
store: s,
peers: make(map[string]*node),
standbys: make(map[string]*node),
store: s,
peers: make(map[string]*node),
}
}
// Peers returns a list of cached peer names.
func (r *Registry) Peers() []string {
// Names returns a list of cached peer names.
func (r *Registry) Names() []string {
r.Lock()
defer r.Unlock()
@ -56,120 +51,43 @@ func (r *Registry) Peers() []string {
return names
}
// Standbys returns a list of cached standby names.
func (r *Registry) Standbys() []string {
r.Lock()
defer r.Unlock()
names := make([]string, 0, len(r.standbys))
for name := range r.standbys {
names = append(names, name)
}
sort.Sort(sort.StringSlice(names))
return names
}
// RegisterPeer adds a peer to the registry.
func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error {
if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil {
return err
}
r.Lock()
defer r.Unlock()
r.peers[name] = r.load(RegistryPeerKey, name)
return nil
}
// RegisterStandby adds a standby to the registry.
func (r *Registry) RegisterStandby(name string, peerURL string, machURL string) error {
if err := r.register(RegistryStandbyKey, name, peerURL, machURL); err != nil {
return err
}
r.Lock()
defer r.Unlock()
r.standbys[name] = r.load(RegistryStandbyKey, name)
return nil
}
func (r *Registry) register(key, name string, peerURL string, machURL string) error {
// Register adds a peer to the registry.
func (r *Registry) Register(name string, peerURL string, machURL string) error {
// Write data to store.
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
_, err := r.store.Create(path.Join(key, name), false, v.Encode(), false, store.Permanent)
log.Debugf("Register: %s", name)
return err
}
if _, err := r.store.Create(path.Join(RegistryKey, name), false, v.Encode(), false, store.Permanent); err != nil {
return err
}
// UpdatePeerURL updates peer URL in registry
func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
r.Lock()
defer r.Unlock()
machURL, _ := r.clientURL(RegistryPeerKey, name)
// Write data to store.
key := path.Join(RegistryPeerKey, name)
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
_, err := r.store.Update(key, v.Encode(), store.Permanent)
// Invalidate outdated cache.
r.invalidate(name)
log.Debugf("Update PeerURL: %s", name)
return err
r.peers[name] = r.load(RegistryKey, name)
return nil
}
// UnregisterPeer removes a peer from the registry.
func (r *Registry) UnregisterPeer(name string) error {
return r.unregister(RegistryPeerKey, name)
}
// UnregisterStandby removes a standby from the registry.
func (r *Registry) UnregisterStandby(name string) error {
return r.unregister(RegistryStandbyKey, name)
}
func (r *Registry) unregister(key, name string) error {
// Unregister removes a peer from the registry.
func (r *Registry) Unregister(name string) error {
// Remove the key from the store.
_, err := r.store.Delete(path.Join(key, name), false, false)
log.Debugf("Unregister: %s", name)
_, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
return err
}
// PeerCount returns the number of peers in the cluster.
func (r *Registry) PeerCount() int {
return r.count(RegistryPeerKey)
}
// StandbyCount returns the number of standbys in the cluster.
func (r *Registry) StandbyCount() int {
return r.count(RegistryStandbyKey)
}
// Returns the number of nodes in the cluster.
func (r *Registry) count(key string) int {
e, err := r.store.Get(key, false, false)
// Count returns the number of peers in the cluster.
func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false)
if err != nil {
return 0
}
return len(e.Node.Nodes)
}
// PeerExists checks if a peer with the given name exists.
func (r *Registry) PeerExists(name string) bool {
return r.exists(RegistryPeerKey, name)
}
// StandbyExists checks if a standby with the given name exists.
func (r *Registry) StandbyExists(name string) bool {
return r.exists(RegistryStandbyKey, name)
}
func (r *Registry) exists(key, name string) bool {
e, err := r.store.Get(path.Join(key, name), false, false)
// Exists checks if a peer with the given name exists.
func (r *Registry) Exists(name string) bool {
e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
if err != nil {
return false
}
@ -180,18 +98,18 @@ func (r *Registry) exists(key, name string) bool {
func (r *Registry) ClientURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.clientURL(RegistryPeerKey, name)
return r.clientURL(RegistryKey, name)
}
func (r *Registry) clientURL(key, name string) (string, bool) {
if r.peers[name] == nil {
if node := r.load(key, name); node != nil {
r.peers[name] = node
if peer := r.load(key, name); peer != nil {
r.peers[name] = peer
}
}
if node := r.peers[name]; node != nil {
return node.url, true
if peer := r.peers[name]; peer != nil {
return peer.url, true
}
return "", false
@ -213,69 +131,50 @@ func (r *Registry) PeerHost(name string) (string, bool) {
func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.peerURL(RegistryPeerKey, name)
return r.peerURL(RegistryKey, name)
}
func (r *Registry) peerURL(key, name string) (string, bool) {
if r.peers[name] == nil {
if node := r.load(key, name); node != nil {
r.peers[name] = node
if peer := r.load(key, name); peer != nil {
r.peers[name] = peer
}
}
if node := r.peers[name]; node != nil {
return node.peerURL, true
if peer := r.peers[name]; peer != nil {
return peer.peerURL, true
}
return "", false
}
// Retrieves the client URL for a given standby by name.
func (r *Registry) StandbyClientURL(name string) (string, bool) {
// UpdatePeerURL updates peer URL in registry
func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
machURL, _ := r.clientURL(RegistryKey, name)
// Write data to store.
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
log.Debugf("Update PeerURL: %s", name)
if _, err := r.store.Update(path.Join(RegistryKey, name), v.Encode(), store.Permanent); err != nil {
return err
}
r.Lock()
defer r.Unlock()
return r.standbyClientURL(RegistryStandbyKey, name)
}
func (r *Registry) standbyClientURL(key, name string) (string, bool) {
if r.standbys[name] == nil {
if node := r.load(key, name); node != nil {
r.standbys[name] = node
}
}
if node := r.standbys[name]; node != nil {
return node.url, true
}
return "", false
}
// Retrieves the peer URL for a given standby by name.
func (r *Registry) StandbyPeerURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.standbyPeerURL(RegistryStandbyKey, name)
}
func (r *Registry) standbyPeerURL(key, name string) (string, bool) {
if r.standbys[name] == nil {
if node := r.load(key, name); node != nil {
r.standbys[name] = node
}
}
if node := r.standbys[name]; node != nil {
return node.peerURL, true
}
return "", false
// Invalidate outdated cache.
r.invalidate(name)
return nil
}
// Retrieves the Client URLs for all nodes.
func (r *Registry) ClientURLs(leaderName, selfName string) []string {
return r.urls(RegistryPeerKey, leaderName, selfName, r.clientURL)
return r.urls(RegistryKey, leaderName, selfName, r.clientURL)
}
// Retrieves the Peer URLs for all nodes.
func (r *Registry) PeerURLs(leaderName, selfName string) []string {
return r.urls(RegistryPeerKey, leaderName, selfName, r.peerURL)
return r.urls(RegistryKey, leaderName, selfName, r.peerURL)
}
// Retrieves the URLs for all nodes using url function.
@ -313,7 +212,6 @@ func (r *Registry) Invalidate(name string) {
func (r *Registry) invalidate(name string) {
delete(r.peers, name)
delete(r.standbys, name)
}
// Loads the given node by name from the store into the cache.

View File

@ -2,7 +2,6 @@ package server
import (
"encoding/binary"
"encoding/json"
"os"
"github.com/coreos/etcd/log"
@ -10,107 +9,51 @@ import (
)
func init() {
raft.RegisterCommand(&RemoveCommandV1{})
raft.RegisterCommand(&RemoveCommandV2{})
raft.RegisterCommand(&RemoveCommand{})
}
// The RemoveCommandV1 removes a server from the cluster.
type RemoveCommandV1 struct {
// The RemoveCommand removes a server from the cluster.
type RemoveCommand struct {
Name string `json:"name"`
}
// The name of the remove command in the log
func (c *RemoveCommandV1) CommandName() string {
func (c *RemoveCommand) CommandName() string {
return "etcd:remove"
}
// Remove a server from the cluster
func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
// If this is a standby then remove it and exit.
if ps.registry.StandbyExists(c.Name) {
return []byte{0}, ps.registry.UnregisterStandby(c.Name)
}
// Remove node from the shared registry.
err := ps.registry.UnregisterPeer(c.Name)
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
index, err := applyRemove(c, context)
if err != nil {
log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
return []byte{0}, err
}
// Remove peer in raft
err = context.Server().RemovePeer(c.Name)
if err != nil {
log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
return []byte{0}, err
}
if c.Name == context.Server().Name() {
// the removed node is this node
// if the node is not replaying the previous logs
// and the node has sent out a join request in this
// start. It is sure that this node received a new remove
// command and need to be removed
if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
log.Debugf("server [%s] is removed", context.Server().Name())
os.Exit(0)
} else {
// else ignore remove
log.Debugf("ignore previous remove command.")
}
return nil, err
}
b := make([]byte, 8)
binary.PutUvarint(b, context.CommitIndex())
return b, err
binary.PutUvarint(b, index)
return b, nil
}
// RemoveCommandV2 represents a command to remove a machine from the server.
type RemoveCommandV2 struct {
Name string `json:"name"`
}
// CommandName returns the name of the command.
func (c *RemoveCommandV2) CommandName() string {
return "etcd:v2:remove"
}
// Apply removes the given machine from the cluster.
func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
// applyRemove removes the given machine from the cluster.
func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) {
ps, _ := context.Server().Context().(*PeerServer)
ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
// If this is a standby then remove it and exit.
if ps.registry.StandbyExists(c.Name) {
if err := ps.registry.UnregisterStandby(c.Name); err != nil {
return nil, err
}
return ret, nil
}
commitIndex := context.CommitIndex()
// Remove node from the shared registry.
err := ps.registry.UnregisterPeer(c.Name)
err := ps.registry.Unregister(c.Name)
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
if err != nil {
log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
return nil, err
return 0, err
}
// Remove peer in raft
if err := context.Server().RemovePeer(c.Name); err != nil {
log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
return nil, err
return 0, err
}
if c.Name == context.Server().Name() {
@ -128,9 +71,5 @@ func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
log.Debugf("ignore previous remove command.")
}
}
return ret, nil
}
type removeMessageV2 struct {
CommitIndex uint64 `json:"commitIndex"`
return commitIndex, nil
}

View File

@ -176,17 +176,6 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit
// Log request.
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
// Forward request along if the server is a standby.
if s.peerServer.Mode() == StandbyMode {
if s.peerServer.standbyClientURL == "" {
w.Header().Set("Content-Type", "application/json")
etcdErr.NewError(402, "", 0).Write(w)
return
}
uhttp.Redirect(s.peerServer.standbyClientURL, w, req)
return
}
// Execute handler function and return error if necessary.
if err := f(w, req); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
@ -231,9 +220,6 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
}
w.Header().Set("X-Leader-Client-URL", s.url)
w.Header().Set("X-Leader-Peer-URL", ps.Config.URL)
// response for raft related commands[join/remove]
if b, ok := result.([]byte); ok {
w.WriteHeader(http.StatusOK)
@ -276,8 +262,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
var url string
switch c.(type) {
case *JoinCommandV1, *RemoveCommandV1,
*JoinCommandV2, *RemoveCommandV2,
case *JoinCommand, *RemoveCommand,
*SetClusterConfigCommand:
url, _ = ps.registry.PeerURL(leader)
default:

View File

@ -132,11 +132,5 @@ func writeHeaders(w http.ResponseWriter, s Server) {
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if url, ok := s.ClientURL(s.Leader()); ok {
w.Header().Set("X-Leader-Client-URL", url)
}
if url, ok := s.PeerURL(s.Leader()); ok {
w.Header().Set("X-Leader-Peer-URL", url)
}
w.WriteHeader(http.StatusOK)
}

View File

@ -25,7 +25,7 @@ func TestRemoveNode(t *testing.T) {
c.SyncCluster()
rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/v2/admin/machines/node3", nil)
rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
client := &http.Client{}
for i := 0; i < 2; i++ {

View File

@ -15,6 +15,8 @@ import (
// Create a full cluster and then add extra an extra standby node.
func TestStandby(t *testing.T) {
t.Skip("functionality unimplemented")
clusterSize := 10 // DefaultActiveSize + 1
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
assert.NoError(t, err)
@ -85,6 +87,8 @@ func TestStandby(t *testing.T) {
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
func TestStandbyAutoPromote(t *testing.T) {
t.Skip("functionality unimplemented")
clusterSize := 10 // DefaultActiveSize + 1
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
if err != nil {