mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
chore: rename proxy mode to standby mode
It makes the name more reasonable.
This commit is contained in:
parent
65b872c8b5
commit
67600603c5
@ -1,24 +1,24 @@
|
|||||||
## Proxies
|
## Standbys
|
||||||
|
|
||||||
Adding peers in an etcd cluster adds network, CPU, and disk overhead to the leader since each one requires replication.
|
Adding peers in an etcd cluster adds network, CPU, and disk overhead to the leader since each one requires replication.
|
||||||
Peers primarily provide resiliency in the event of a leader failure but the benefit of more failover nodes decreases as the cluster size increases.
|
Peers primarily provide resiliency in the event of a leader failure but the benefit of more failover nodes decreases as the cluster size increases.
|
||||||
A lightweight alternative is the proxy.
|
A lightweight alternative is the standby.
|
||||||
|
|
||||||
Proxies are a way for an etcd node to forward requests along to the cluster but the proxies are not part of the Raft cluster themselves.
|
Standbys are a way for an etcd node to forward requests along to the cluster but the standbys are not part of the Raft cluster themselves.
|
||||||
This provides an easier API for local applications while reducing the overhead required by a regular peer node.
|
This provides an easier API for local applications while reducing the overhead required by a regular peer node.
|
||||||
Proxies also act as standby nodes in the event that a peer node in the cluster has not recovered after a long duration.
|
Standbys also act as standby nodes in the event that a peer node in the cluster has not recovered after a long duration.
|
||||||
|
|
||||||
|
|
||||||
## Configuration Parameters
|
## Configuration Parameters
|
||||||
|
|
||||||
Proxies require two additional configuration parameters: active size & promotion delay.
|
Standbys require two additional configuration parameters: active size & promotion delay.
|
||||||
The active size specifies a target size for the number of peers in the cluster.
|
The active size specifies a target size for the number of peers in the cluster.
|
||||||
If there are not enough peers to meet the active size then proxies are promoted to peers until the peer count is equal to the active size.
|
If there are not enough peers to meet the active size then standbys are promoted to peers until the peer count is equal to the active size.
|
||||||
If there are more peers than the target active size then peers are demoted to proxies.
|
If there are more peers than the target active size then peers are demoted to standbys.
|
||||||
|
|
||||||
The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a proxy.
|
The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a standby.
|
||||||
By default this is 30 minutes.
|
By default this is 30 minutes.
|
||||||
If a peer is inactive for 30 minutes then the peer is removed and a live proxy is found to take its place.
|
If a peer is inactive for 30 minutes then the peer is removed and a live standby is found to take its place.
|
||||||
|
|
||||||
|
|
||||||
## Logical Workflow
|
## Logical Workflow
|
||||||
@ -27,12 +27,12 @@ Start a etcd machine and join the cluster:
|
|||||||
|
|
||||||
```
|
```
|
||||||
If peer count less than active size:
|
If peer count less than active size:
|
||||||
If machine already exists as a proxy:
|
If machine already exists as a standby:
|
||||||
Remove machine from proxy list
|
Remove machine from standby list
|
||||||
Join as peer
|
Join as peer
|
||||||
|
|
||||||
If peer count greater than or equal to active size:
|
If peer count greater than or equal to active size:
|
||||||
Join as proxy
|
Join as standby
|
||||||
```
|
```
|
||||||
|
|
||||||
Remove an existing etcd machine from the cluster:
|
Remove an existing etcd machine from the cluster:
|
||||||
@ -41,8 +41,8 @@ Remove an existing etcd machine from the cluster:
|
|||||||
If machine exists in peer list:
|
If machine exists in peer list:
|
||||||
Remove from peer list
|
Remove from peer list
|
||||||
|
|
||||||
If machine exists in proxy list:
|
If machine exists in standby list:
|
||||||
Remove from proxy list
|
Remove from standby list
|
||||||
```
|
```
|
||||||
|
|
||||||
Leader's active size monitor:
|
Leader's active size monitor:
|
||||||
@ -52,10 +52,10 @@ Loop:
|
|||||||
Sleep 5 seconds
|
Sleep 5 seconds
|
||||||
|
|
||||||
If peer count less than active size:
|
If peer count less than active size:
|
||||||
If proxy count greater than zero:
|
If standby count greater than zero:
|
||||||
Request a random proxy to rejoin
|
Request a random standby to rejoin
|
||||||
Goto Loop
|
Goto Loop
|
||||||
|
|
||||||
If peer count greater than active size:
|
If peer count greater than active size:
|
||||||
Demote randomly selected peer
|
Demote randomly selected peer
|
||||||
Goto Loop
|
Goto Loop
|
@ -53,10 +53,10 @@ var errors = map[int]string{
|
|||||||
// etcd related errors
|
// etcd related errors
|
||||||
EcodeWatcherCleared: "watcher is cleared due to etcd recovery",
|
EcodeWatcherCleared: "watcher is cleared due to etcd recovery",
|
||||||
EcodeEventIndexCleared: "The event in requested index is outdated and cleared",
|
EcodeEventIndexCleared: "The event in requested index is outdated and cleared",
|
||||||
EcodeProxyInternal: "Proxy Internal Error",
|
EcodeStandbyInternal: "Standby Internal Error",
|
||||||
EcodeInvalidActiveSize: "Invalid active size",
|
EcodeInvalidActiveSize: "Invalid active size",
|
||||||
EcodeInvalidPromoteDelay: "Proxy promote delay",
|
EcodeInvalidPromoteDelay: "Standby promote delay",
|
||||||
EcodePromoteError: "Proxy promotion error",
|
EcodePromoteError: "Standby promotion error",
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -86,7 +86,7 @@ const (
|
|||||||
|
|
||||||
EcodeWatcherCleared = 400
|
EcodeWatcherCleared = 400
|
||||||
EcodeEventIndexCleared = 401
|
EcodeEventIndexCleared = 401
|
||||||
EcodeProxyInternal = 402
|
EcodeStandbyInternal = 402
|
||||||
EcodeInvalidActiveSize = 403
|
EcodeInvalidActiveSize = 403
|
||||||
EcodeInvalidPromoteDelay = 404
|
EcodeInvalidPromoteDelay = 404
|
||||||
EcodePromoteError = 405
|
EcodePromoteError = 405
|
||||||
|
@ -22,11 +22,11 @@ const (
|
|||||||
// These settings can only be changed through Raft.
|
// These settings can only be changed through Raft.
|
||||||
type ClusterConfig struct {
|
type ClusterConfig struct {
|
||||||
// ActiveSize is the maximum number of node that can join as Raft followers.
|
// ActiveSize is the maximum number of node that can join as Raft followers.
|
||||||
// Nodes that join the cluster after the limit is reached are proxies.
|
// Nodes that join the cluster after the limit is reached are standbys.
|
||||||
ActiveSize int `json:"activeSize"`
|
ActiveSize int `json:"activeSize"`
|
||||||
|
|
||||||
// PromoteDelay is the amount of time, in seconds, after a node is
|
// PromoteDelay is the amount of time, in seconds, after a node is
|
||||||
// unreachable that it will be swapped out for a proxy node, if available.
|
// unreachable that it will be swapped out for a standby node, if available.
|
||||||
PromoteDelay int `json:"promoteDelay"`
|
PromoteDelay int `json:"promoteDelay"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ func init() {
|
|||||||
raft.RegisterCommand(&DemoteCommand{})
|
raft.RegisterCommand(&DemoteCommand{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DemoteCommand represents a command to change a peer to a proxy.
|
// DemoteCommand represents a command to change a peer to a standby.
|
||||||
type DemoteCommand struct {
|
type DemoteCommand struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
}
|
}
|
||||||
@ -51,14 +51,14 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register node as a proxy.
|
// Register node as a standby.
|
||||||
ps.registry.RegisterProxy(c.Name, peerURL, clientURL)
|
ps.registry.RegisterStandby(c.Name, peerURL, clientURL)
|
||||||
|
|
||||||
// Update mode if this change applies to this server.
|
// Update mode if this change applies to this server.
|
||||||
if c.Name == ps.Config.Name {
|
if c.Name == ps.Config.Name {
|
||||||
log.Infof("Demote peer %s: Set mode to proxy with %s", c.Name, ps.server.Leader())
|
log.Infof("Demote peer %s: Set mode to standby with %s", c.Name, ps.server.Leader())
|
||||||
ps.proxyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
|
ps.standbyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
|
||||||
go ps.setMode(ProxyMode)
|
go ps.setMode(StandbyMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -101,15 +101,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
|
|||||||
|
|
||||||
// Check peer number in the cluster.
|
// Check peer number in the cluster.
|
||||||
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
|
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
|
||||||
log.Debug("Join as proxy ", c.Name)
|
log.Debug("Join as standby ", c.Name)
|
||||||
ps.registry.RegisterProxy(c.Name, c.PeerURL, c.ClientURL)
|
ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL)
|
||||||
msg.Mode = ProxyMode
|
msg.Mode = StandbyMode
|
||||||
return json.Marshal(msg)
|
return json.Marshal(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove it as a proxy if it is one.
|
// Remove it as a standby if it is one.
|
||||||
if ps.registry.ProxyExists(c.Name) {
|
if ps.registry.StandbyExists(c.Name) {
|
||||||
ps.registry.UnregisterProxy(c.Name)
|
ps.registry.UnregisterStandby(c.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add to shared peer registry.
|
// Add to shared peer registry.
|
||||||
|
@ -41,7 +41,7 @@ const (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
peerModeFlag = 0
|
peerModeFlag = 0
|
||||||
proxyModeFlag = 1
|
standbyModeFlag = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
type PeerServerConfig struct {
|
type PeerServerConfig struct {
|
||||||
@ -69,8 +69,8 @@ type PeerServer struct {
|
|||||||
closeChan chan bool
|
closeChan chan bool
|
||||||
timeoutThresholdChan chan interface{}
|
timeoutThresholdChan chan interface{}
|
||||||
|
|
||||||
proxyPeerURL string
|
standbyPeerURL string
|
||||||
proxyClientURL string
|
standbyClientURL string
|
||||||
|
|
||||||
metrics *metrics.Bucket
|
metrics *metrics.Bucket
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -134,7 +134,7 @@ func (s *PeerServer) Mode() Mode {
|
|||||||
|
|
||||||
// SetMode updates the current mode of the server.
|
// SetMode updates the current mode of the server.
|
||||||
// Switching to a peer mode will start the Raft server.
|
// Switching to a peer mode will start the Raft server.
|
||||||
// Switching to a proxy mode will stop the Raft server.
|
// Switching to a standby mode will stop the Raft server.
|
||||||
func (s *PeerServer) setMode(mode Mode) {
|
func (s *PeerServer) setMode(mode Mode) {
|
||||||
s.mode = mode
|
s.mode = mode
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ func (s *PeerServer) setMode(mode Mode) {
|
|||||||
if !s.raftServer.Running() {
|
if !s.raftServer.Running() {
|
||||||
s.raftServer.Start()
|
s.raftServer.Start()
|
||||||
}
|
}
|
||||||
case ProxyMode:
|
case StandbyMode:
|
||||||
if s.raftServer.Running() {
|
if s.raftServer.Running() {
|
||||||
s.raftServer.Stop()
|
s.raftServer.Stop()
|
||||||
}
|
}
|
||||||
@ -157,7 +157,7 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig {
|
|||||||
|
|
||||||
// SetClusterConfig updates the current cluster configuration.
|
// SetClusterConfig updates the current cluster configuration.
|
||||||
// Adjusting the active size will cause the PeerServer to demote peers or
|
// Adjusting the active size will cause the PeerServer to demote peers or
|
||||||
// promote proxies to match the new size.
|
// promote standbys to match the new size.
|
||||||
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
||||||
// Set minimums.
|
// Set minimums.
|
||||||
if c.ActiveSize < MinActiveSize {
|
if c.ActiveSize < MinActiveSize {
|
||||||
@ -552,9 +552,9 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
|||||||
s.joinIndex = msg.CommitIndex
|
s.joinIndex = msg.CommitIndex
|
||||||
s.setMode(msg.Mode)
|
s.setMode(msg.Mode)
|
||||||
|
|
||||||
if msg.Mode == ProxyMode {
|
if msg.Mode == StandbyMode {
|
||||||
s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
|
s.standbyClientURL = resp.Header.Get("X-Leader-Client-URL")
|
||||||
s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
|
s.standbyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -711,7 +711,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// monitorActiveSize has the leader periodically check the status of cluster
|
// monitorActiveSize has the leader periodically check the status of cluster
|
||||||
// nodes and swaps them out for proxies as needed.
|
// nodes and swaps them out for standbys as needed.
|
||||||
func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -728,7 +728,7 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
|||||||
// Retrieve target active size and actual active size.
|
// Retrieve target active size and actual active size.
|
||||||
activeSize := s.ClusterConfig().ActiveSize
|
activeSize := s.ClusterConfig().ActiveSize
|
||||||
peerCount := s.registry.PeerCount()
|
peerCount := s.registry.PeerCount()
|
||||||
proxies := s.registry.Proxies()
|
standbys := s.registry.Standbys()
|
||||||
peers := s.registry.Peers()
|
peers := s.registry.Peers()
|
||||||
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
|
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
|
||||||
peers = append(peers[:index], peers[index+1:]...)
|
peers = append(peers[:index], peers[index+1:]...)
|
||||||
@ -744,22 +744,22 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we don't have enough active nodes then try to promote a proxy.
|
// If we don't have enough active nodes then try to promote a standby.
|
||||||
if peerCount < activeSize && len(proxies) > 0 {
|
if peerCount < activeSize && len(standbys) > 0 {
|
||||||
loop:
|
loop:
|
||||||
for _, i := range rand.Perm(len(proxies)) {
|
for _, i := range rand.Perm(len(standbys)) {
|
||||||
proxy := proxies[i]
|
standby := standbys[i]
|
||||||
proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy)
|
standbyPeerURL, _ := s.registry.StandbyPeerURL(standby)
|
||||||
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, proxy, proxyPeerURL)
|
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, standby, standbyPeerURL)
|
||||||
|
|
||||||
// Notify proxy to promote itself.
|
// Notify standby to promote itself.
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
DisableKeepAlives: false,
|
DisableKeepAlives: false,
|
||||||
ResponseHeaderTimeout: ActiveMonitorTimeout,
|
ResponseHeaderTimeout: ActiveMonitorTimeout,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil)
|
resp, err := client.Post(fmt.Sprintf("%s/promote", standbyPeerURL), "application/json", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
|
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
|
||||||
continue
|
continue
|
||||||
@ -806,13 +806,13 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Mode represents whether the server is an active peer or if the server is
|
// Mode represents whether the server is an active peer or if the server is
|
||||||
// simply acting as a proxy.
|
// simply acting as a standby.
|
||||||
type Mode string
|
type Mode string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// PeerMode is when the server is an active node in Raft.
|
// PeerMode is when the server is an active node in Raft.
|
||||||
PeerMode = Mode("peer")
|
PeerMode = Mode("peer")
|
||||||
|
|
||||||
// ProxyMode is when the server is an inactive, request-forwarding node.
|
// StandbyMode is when the server is an inactive, request-forwarding node.
|
||||||
ProxyMode = Mode("proxy")
|
StandbyMode = Mode("standby")
|
||||||
)
|
)
|
||||||
|
@ -172,8 +172,8 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request)
|
|||||||
|
|
||||||
// Attempt to rejoin the cluster as a peer.
|
// Attempt to rejoin the cluster as a peer.
|
||||||
func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) {
|
func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.proxyPeerURL)
|
log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.standbyPeerURL)
|
||||||
url, err := url.Parse(ps.proxyPeerURL)
|
url, err := url.Parse(ps.standbyPeerURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -240,19 +240,19 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
|
|||||||
json.NewEncoder(w).Encode(&ps.clusterConfig)
|
json.NewEncoder(w).Encode(&ps.clusterConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieves a list of peers and proxies.
|
// Retrieves a list of peers and standbys.
|
||||||
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
|
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
machines := make([]*machineMessage, 0)
|
machines := make([]*machineMessage, 0)
|
||||||
for _, name := range ps.registry.Peers() {
|
for _, name := range ps.registry.Peers() {
|
||||||
machines = append(machines, ps.getMachineMessage(name))
|
machines = append(machines, ps.getMachineMessage(name))
|
||||||
}
|
}
|
||||||
for _, name := range ps.registry.Proxies() {
|
for _, name := range ps.registry.Standbys() {
|
||||||
machines = append(machines, ps.getMachineMessage(name))
|
machines = append(machines, ps.getMachineMessage(name))
|
||||||
}
|
}
|
||||||
json.NewEncoder(w).Encode(&machines)
|
json.NewEncoder(w).Encode(&machines)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve single peer or proxy.
|
// Retrieve single peer or standby.
|
||||||
func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
|
func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
json.NewEncoder(w).Encode(ps.getMachineMessage(vars["name"]))
|
json.NewEncoder(w).Encode(ps.getMachineMessage(vars["name"]))
|
||||||
@ -270,12 +270,12 @@ func (ps *PeerServer) getMachineMessage(name string) *machineMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.registry.ProxyExists(name) {
|
if ps.registry.StandbyExists(name) {
|
||||||
clientURL, _ := ps.registry.ProxyClientURL(name)
|
clientURL, _ := ps.registry.StandbyClientURL(name)
|
||||||
peerURL, _ := ps.registry.ProxyPeerURL(name)
|
peerURL, _ := ps.registry.StandbyPeerURL(name)
|
||||||
return &machineMessage{
|
return &machineMessage{
|
||||||
Name: name,
|
Name: name,
|
||||||
Mode: ProxyMode,
|
Mode: StandbyMode,
|
||||||
ClientURL: clientURL,
|
ClientURL: clientURL,
|
||||||
PeerURL: peerURL,
|
PeerURL: peerURL,
|
||||||
}
|
}
|
||||||
@ -357,7 +357,7 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
// machineMessage represents information about a peer or proxy in the registry.
|
// machineMessage represents information about a peer or standby in the registry.
|
||||||
type machineMessage struct {
|
type machineMessage struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Mode Mode `json:"mode"`
|
Mode Mode `json:"mode"`
|
||||||
|
@ -16,15 +16,15 @@ import (
|
|||||||
// The location of the peer URL data.
|
// The location of the peer URL data.
|
||||||
const RegistryPeerKey = "/_etcd/machines"
|
const RegistryPeerKey = "/_etcd/machines"
|
||||||
|
|
||||||
// The location of the proxy URL data.
|
// The location of the standby URL data.
|
||||||
const RegistryProxyKey = "/_etcd/proxies"
|
const RegistryStandbyKey = "/_etcd/standbys"
|
||||||
|
|
||||||
// The Registry stores URL information for nodes.
|
// The Registry stores URL information for nodes.
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
store store.Store
|
store store.Store
|
||||||
peers map[string]*node
|
peers map[string]*node
|
||||||
proxies map[string]*node
|
standbys map[string]*node
|
||||||
}
|
}
|
||||||
|
|
||||||
// The internal storage format of the registry.
|
// The internal storage format of the registry.
|
||||||
@ -39,7 +39,7 @@ func NewRegistry(s store.Store) *Registry {
|
|||||||
return &Registry{
|
return &Registry{
|
||||||
store: s,
|
store: s,
|
||||||
peers: make(map[string]*node),
|
peers: make(map[string]*node),
|
||||||
proxies: make(map[string]*node),
|
standbys: make(map[string]*node),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,13 +56,13 @@ func (r *Registry) Peers() []string {
|
|||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxies returns a list of cached proxy names.
|
// Standbys returns a list of cached standby names.
|
||||||
func (r *Registry) Proxies() []string {
|
func (r *Registry) Standbys() []string {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
names := make([]string, 0, len(r.proxies))
|
names := make([]string, 0, len(r.standbys))
|
||||||
for name := range r.proxies {
|
for name := range r.standbys {
|
||||||
names = append(names, name)
|
names = append(names, name)
|
||||||
}
|
}
|
||||||
sort.Sort(sort.StringSlice(names))
|
sort.Sort(sort.StringSlice(names))
|
||||||
@ -81,15 +81,15 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterProxy adds a proxy to the registry.
|
// RegisterStandby adds a standby to the registry.
|
||||||
func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error {
|
func (r *Registry) RegisterStandby(name string, peerURL string, machURL string) error {
|
||||||
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
|
if err := r.register(RegistryStandbyKey, name, peerURL, machURL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
r.proxies[name] = r.load(RegistryProxyKey, name)
|
r.standbys[name] = r.load(RegistryStandbyKey, name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,9 +108,9 @@ func (r *Registry) UnregisterPeer(name string) error {
|
|||||||
return r.unregister(RegistryPeerKey, name)
|
return r.unregister(RegistryPeerKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnregisterProxy removes a proxy from the registry.
|
// UnregisterStandby removes a standby from the registry.
|
||||||
func (r *Registry) UnregisterProxy(name string) error {
|
func (r *Registry) UnregisterStandby(name string) error {
|
||||||
return r.unregister(RegistryProxyKey, name)
|
return r.unregister(RegistryStandbyKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) unregister(key, name string) error {
|
func (r *Registry) unregister(key, name string) error {
|
||||||
@ -125,9 +125,9 @@ func (r *Registry) PeerCount() int {
|
|||||||
return r.count(RegistryPeerKey)
|
return r.count(RegistryPeerKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyCount returns the number of proxies in the cluster.
|
// StandbyCount returns the number of standbys in the cluster.
|
||||||
func (r *Registry) ProxyCount() int {
|
func (r *Registry) StandbyCount() int {
|
||||||
return r.count(RegistryProxyKey)
|
return r.count(RegistryStandbyKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the number of nodes in the cluster.
|
// Returns the number of nodes in the cluster.
|
||||||
@ -144,9 +144,9 @@ func (r *Registry) PeerExists(name string) bool {
|
|||||||
return r.exists(RegistryPeerKey, name)
|
return r.exists(RegistryPeerKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyExists checks if a proxy with the given name exists.
|
// StandbyExists checks if a standby with the given name exists.
|
||||||
func (r *Registry) ProxyExists(name string) bool {
|
func (r *Registry) StandbyExists(name string) bool {
|
||||||
return r.exists(RegistryProxyKey, name)
|
return r.exists(RegistryStandbyKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) exists(key, name string) bool {
|
func (r *Registry) exists(key, name string) bool {
|
||||||
@ -211,39 +211,39 @@ func (r *Registry) peerURL(key, name string) (string, bool) {
|
|||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieves the client URL for a given proxy by name.
|
// Retrieves the client URL for a given standby by name.
|
||||||
func (r *Registry) ProxyClientURL(name string) (string, bool) {
|
func (r *Registry) StandbyClientURL(name string) (string, bool) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
return r.proxyClientURL(RegistryProxyKey, name)
|
return r.standbyClientURL(RegistryStandbyKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) proxyClientURL(key, name string) (string, bool) {
|
func (r *Registry) standbyClientURL(key, name string) (string, bool) {
|
||||||
if r.proxies[name] == nil {
|
if r.standbys[name] == nil {
|
||||||
if node := r.load(key, name); node != nil {
|
if node := r.load(key, name); node != nil {
|
||||||
r.proxies[name] = node
|
r.standbys[name] = node
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if node := r.proxies[name]; node != nil {
|
if node := r.standbys[name]; node != nil {
|
||||||
return node.url, true
|
return node.url, true
|
||||||
}
|
}
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieves the peer URL for a given proxy by name.
|
// Retrieves the peer URL for a given standby by name.
|
||||||
func (r *Registry) ProxyPeerURL(name string) (string, bool) {
|
func (r *Registry) StandbyPeerURL(name string) (string, bool) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
return r.proxyPeerURL(RegistryProxyKey, name)
|
return r.standbyPeerURL(RegistryStandbyKey, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) proxyPeerURL(key, name string) (string, bool) {
|
func (r *Registry) standbyPeerURL(key, name string) (string, bool) {
|
||||||
if r.proxies[name] == nil {
|
if r.standbys[name] == nil {
|
||||||
if node := r.load(key, name); node != nil {
|
if node := r.load(key, name); node != nil {
|
||||||
r.proxies[name] = node
|
r.standbys[name] = node
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if node := r.proxies[name]; node != nil {
|
if node := r.standbys[name]; node != nil {
|
||||||
return node.peerURL, true
|
return node.peerURL, true
|
||||||
}
|
}
|
||||||
return "", false
|
return "", false
|
||||||
@ -292,7 +292,7 @@ func (r *Registry) Invalidate(name string) {
|
|||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
delete(r.peers, name)
|
delete(r.peers, name)
|
||||||
delete(r.proxies, name)
|
delete(r.standbys, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loads the given node by name from the store into the cache.
|
// Loads the given node by name from the store into the cache.
|
||||||
|
@ -28,9 +28,9 @@ func (c *RemoveCommandV1) CommandName() string {
|
|||||||
func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
|
func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
|
||||||
ps, _ := context.Server().Context().(*PeerServer)
|
ps, _ := context.Server().Context().(*PeerServer)
|
||||||
|
|
||||||
// If this is a proxy then remove it and exit.
|
// If this is a standby then remove it and exit.
|
||||||
if ps.registry.ProxyExists(c.Name) {
|
if ps.registry.StandbyExists(c.Name) {
|
||||||
return []byte{0}, ps.registry.UnregisterProxy(c.Name)
|
return []byte{0}, ps.registry.UnregisterStandby(c.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove node from the shared registry.
|
// Remove node from the shared registry.
|
||||||
@ -88,9 +88,9 @@ func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
|
|||||||
ps, _ := context.Server().Context().(*PeerServer)
|
ps, _ := context.Server().Context().(*PeerServer)
|
||||||
ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
|
ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
|
||||||
|
|
||||||
// If this is a proxy then remove it and exit.
|
// If this is a standby then remove it and exit.
|
||||||
if ps.registry.ProxyExists(c.Name) {
|
if ps.registry.StandbyExists(c.Name) {
|
||||||
if err := ps.registry.UnregisterProxy(c.Name); err != nil {
|
if err := ps.registry.UnregisterStandby(c.Name); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
@ -176,14 +176,14 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit
|
|||||||
// Log request.
|
// Log request.
|
||||||
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
|
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
|
||||||
|
|
||||||
// Forward request along if the server is a proxy.
|
// Forward request along if the server is a standby.
|
||||||
if s.peerServer.Mode() == ProxyMode {
|
if s.peerServer.Mode() == StandbyMode {
|
||||||
if s.peerServer.proxyClientURL == "" {
|
if s.peerServer.standbyClientURL == "" {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
etcdErr.NewError(402, "", 0).Write(w)
|
etcdErr.NewError(402, "", 0).Write(w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
uhttp.Redirect(s.peerServer.proxyClientURL, w, req)
|
uhttp.Redirect(s.peerServer.standbyClientURL, w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,8 +13,8 @@ import (
|
|||||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create a full cluster and then add extra an extra proxy node.
|
// Create a full cluster and then add extra an extra standby node.
|
||||||
func TestProxy(t *testing.T) {
|
func TestStandby(t *testing.T) {
|
||||||
clusterSize := 10 // DefaultActiveSize + 1
|
clusterSize := 10 // DefaultActiveSize + 1
|
||||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -34,7 +34,7 @@ func TestProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
// Check that all peers and proxies have the value.
|
// Check that all peers and standbys have the value.
|
||||||
for i := range etcds {
|
for i := range etcds {
|
||||||
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
||||||
if assert.NoError(t, err) {
|
if assert.NoError(t, err) {
|
||||||
@ -45,8 +45,8 @@ func TestProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that we have one proxy.
|
// Verify that we have one standby.
|
||||||
result, err := c.Get("_etcd/proxies", false, true)
|
result, err := c.Get("_etcd/standbys", false, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||||
|
|
||||||
@ -58,8 +58,8 @@ func TestProxy(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||||
|
|
||||||
// Verify that the proxy node is now a peer.
|
// Verify that the standby node is now a peer.
|
||||||
result, err = c.Get("_etcd/proxies", false, true)
|
result, err = c.Get("_etcd/standbys", false, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, len(result.Node.Nodes), 0)
|
assert.Equal(t, len(result.Node.Nodes), 0)
|
||||||
|
|
||||||
@ -77,14 +77,14 @@ func TestProxy(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, len(result.Node.Nodes), 8)
|
assert.Equal(t, len(result.Node.Nodes), 8)
|
||||||
|
|
||||||
// Verify that we now have two proxies.
|
// Verify that we now have two standbys.
|
||||||
result, err = c.Get("_etcd/proxies", false, true)
|
result, err = c.Get("_etcd/standbys", false, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, len(result.Node.Nodes), 2)
|
assert.Equal(t, len(result.Node.Nodes), 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
|
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
|
||||||
func TestProxyAutoPromote(t *testing.T) {
|
func TestStandbyAutoPromote(t *testing.T) {
|
||||||
clusterSize := 10 // DefaultActiveSize + 1
|
clusterSize := 10 // DefaultActiveSize + 1
|
||||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -101,8 +101,8 @@ func TestProxyAutoPromote(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
// Verify that we have one proxy.
|
// Verify that we have one standby.
|
||||||
result, err := c.Get("_etcd/proxies", false, true)
|
result, err := c.Get("_etcd/standbys", false, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||||
|
|
||||||
@ -123,7 +123,7 @@ func TestProxyAutoPromote(t *testing.T) {
|
|||||||
// Wait for it to get dropped.
|
// Wait for it to get dropped.
|
||||||
time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second))
|
time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second))
|
||||||
|
|
||||||
// Wait for the proxy to be promoted.
|
// Wait for the standby to be promoted.
|
||||||
time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second))
|
time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second))
|
||||||
|
|
||||||
// Verify that we have 9 peers.
|
// Verify that we have 9 peers.
|
||||||
@ -135,10 +135,10 @@ func TestProxyAutoPromote(t *testing.T) {
|
|||||||
result, err = c.Get("_etcd/machines/node10", false, false)
|
result, err = c.Get("_etcd/machines/node10", false, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Verify that there are no more proxies.
|
// Verify that there are no more standbys.
|
||||||
result, err = c.Get("_etcd/proxies", false, true)
|
result, err = c.Get("_etcd/standbys", false, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
if assert.Equal(t, len(result.Node.Nodes), 1) {
|
if assert.Equal(t, len(result.Node.Nodes), 1) {
|
||||||
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/proxies/node2")
|
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/standbys/node2")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user