mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #733 from unihorn/66
chore: rename proxy mode to standby mode
This commit is contained in:
commit
93d1293b9d
@ -1,24 +1,24 @@
|
||||
## Proxies
|
||||
## Standbys
|
||||
|
||||
Adding peers in an etcd cluster adds network, CPU, and disk overhead to the leader since each one requires replication.
|
||||
Peers primarily provide resiliency in the event of a leader failure but the benefit of more failover nodes decreases as the cluster size increases.
|
||||
A lightweight alternative is the proxy.
|
||||
A lightweight alternative is the standby.
|
||||
|
||||
Proxies are a way for an etcd node to forward requests along to the cluster but the proxies are not part of the Raft cluster themselves.
|
||||
Standbys are a way for an etcd node to forward requests along to the cluster but the standbys are not part of the Raft cluster themselves.
|
||||
This provides an easier API for local applications while reducing the overhead required by a regular peer node.
|
||||
Proxies also act as standby nodes in the event that a peer node in the cluster has not recovered after a long duration.
|
||||
Standbys also act as standby nodes in the event that a peer node in the cluster has not recovered after a long duration.
|
||||
|
||||
|
||||
## Configuration Parameters
|
||||
|
||||
Proxies require two additional configuration parameters: active size & promotion delay.
|
||||
Standbys require two additional configuration parameters: active size & promotion delay.
|
||||
The active size specifies a target size for the number of peers in the cluster.
|
||||
If there are not enough peers to meet the active size then proxies are promoted to peers until the peer count is equal to the active size.
|
||||
If there are more peers than the target active size then peers are demoted to proxies.
|
||||
If there are not enough peers to meet the active size then standbys are promoted to peers until the peer count is equal to the active size.
|
||||
If there are more peers than the target active size then peers are demoted to standbys.
|
||||
|
||||
The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a proxy.
|
||||
The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a standby.
|
||||
By default this is 30 minutes.
|
||||
If a peer is inactive for 30 minutes then the peer is removed and a live proxy is found to take its place.
|
||||
If a peer is inactive for 30 minutes then the peer is removed and a live standby is found to take its place.
|
||||
|
||||
|
||||
## Logical Workflow
|
||||
@ -27,12 +27,12 @@ Start a etcd machine and join the cluster:
|
||||
|
||||
```
|
||||
If peer count less than active size:
|
||||
If machine already exists as a proxy:
|
||||
Remove machine from proxy list
|
||||
If machine already exists as a standby:
|
||||
Remove machine from standby list
|
||||
Join as peer
|
||||
|
||||
If peer count greater than or equal to active size:
|
||||
Join as proxy
|
||||
Join as standby
|
||||
```
|
||||
|
||||
Remove an existing etcd machine from the cluster:
|
||||
@ -41,8 +41,8 @@ Remove an existing etcd machine from the cluster:
|
||||
If machine exists in peer list:
|
||||
Remove from peer list
|
||||
|
||||
If machine exists in proxy list:
|
||||
Remove from proxy list
|
||||
If machine exists in standby list:
|
||||
Remove from standby list
|
||||
```
|
||||
|
||||
Leader's active size monitor:
|
||||
@ -52,10 +52,10 @@ Loop:
|
||||
Sleep 5 seconds
|
||||
|
||||
If peer count less than active size:
|
||||
If proxy count greater than zero:
|
||||
Request a random proxy to rejoin
|
||||
If standby count greater than zero:
|
||||
Request a random standby to rejoin
|
||||
Goto Loop
|
||||
|
||||
|
||||
If peer count greater than active size:
|
||||
Demote randomly selected peer
|
||||
Goto Loop
|
@ -53,10 +53,10 @@ var errors = map[int]string{
|
||||
// etcd related errors
|
||||
EcodeWatcherCleared: "watcher is cleared due to etcd recovery",
|
||||
EcodeEventIndexCleared: "The event in requested index is outdated and cleared",
|
||||
EcodeProxyInternal: "Proxy Internal Error",
|
||||
EcodeStandbyInternal: "Standby Internal Error",
|
||||
EcodeInvalidActiveSize: "Invalid active size",
|
||||
EcodeInvalidPromoteDelay: "Proxy promote delay",
|
||||
EcodePromoteError: "Proxy promotion error",
|
||||
EcodeInvalidPromoteDelay: "Standby promote delay",
|
||||
EcodePromoteError: "Standby promotion error",
|
||||
}
|
||||
|
||||
const (
|
||||
@ -86,7 +86,7 @@ const (
|
||||
|
||||
EcodeWatcherCleared = 400
|
||||
EcodeEventIndexCleared = 401
|
||||
EcodeProxyInternal = 402
|
||||
EcodeStandbyInternal = 402
|
||||
EcodeInvalidActiveSize = 403
|
||||
EcodeInvalidPromoteDelay = 404
|
||||
EcodePromoteError = 405
|
||||
|
@ -22,11 +22,11 @@ const (
|
||||
// These settings can only be changed through Raft.
|
||||
type ClusterConfig struct {
|
||||
// ActiveSize is the maximum number of node that can join as Raft followers.
|
||||
// Nodes that join the cluster after the limit is reached are proxies.
|
||||
// Nodes that join the cluster after the limit is reached are standbys.
|
||||
ActiveSize int `json:"activeSize"`
|
||||
|
||||
// PromoteDelay is the amount of time, in seconds, after a node is
|
||||
// unreachable that it will be swapped out for a proxy node, if available.
|
||||
// unreachable that it will be swapped out for a standby node, if available.
|
||||
PromoteDelay int `json:"promoteDelay"`
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,7 @@ func init() {
|
||||
raft.RegisterCommand(&DemoteCommand{})
|
||||
}
|
||||
|
||||
// DemoteCommand represents a command to change a peer to a proxy.
|
||||
// DemoteCommand represents a command to change a peer to a standby.
|
||||
type DemoteCommand struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
@ -51,14 +51,14 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Register node as a proxy.
|
||||
ps.registry.RegisterProxy(c.Name, peerURL, clientURL)
|
||||
// Register node as a standby.
|
||||
ps.registry.RegisterStandby(c.Name, peerURL, clientURL)
|
||||
|
||||
// Update mode if this change applies to this server.
|
||||
if c.Name == ps.Config.Name {
|
||||
log.Infof("Demote peer %s: Set mode to proxy with %s", c.Name, ps.server.Leader())
|
||||
ps.proxyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
|
||||
go ps.setMode(ProxyMode)
|
||||
log.Infof("Demote peer %s: Set mode to standby with %s", c.Name, ps.server.Leader())
|
||||
ps.standbyPeerURL, _ = ps.registry.PeerURL(ps.server.Leader())
|
||||
go ps.setMode(StandbyMode)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
|
@ -101,15 +101,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
|
||||
|
||||
// Check peer number in the cluster.
|
||||
if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
|
||||
log.Debug("Join as proxy ", c.Name)
|
||||
ps.registry.RegisterProxy(c.Name, c.PeerURL, c.ClientURL)
|
||||
msg.Mode = ProxyMode
|
||||
log.Debug("Join as standby ", c.Name)
|
||||
ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL)
|
||||
msg.Mode = StandbyMode
|
||||
return json.Marshal(msg)
|
||||
}
|
||||
|
||||
// Remove it as a proxy if it is one.
|
||||
if ps.registry.ProxyExists(c.Name) {
|
||||
ps.registry.UnregisterProxy(c.Name)
|
||||
// Remove it as a standby if it is one.
|
||||
if ps.registry.StandbyExists(c.Name) {
|
||||
ps.registry.UnregisterStandby(c.Name)
|
||||
}
|
||||
|
||||
// Add to shared peer registry.
|
||||
|
@ -41,7 +41,7 @@ const (
|
||||
|
||||
const (
|
||||
peerModeFlag = 0
|
||||
proxyModeFlag = 1
|
||||
standbyModeFlag = 1
|
||||
)
|
||||
|
||||
type PeerServerConfig struct {
|
||||
@ -69,8 +69,8 @@ type PeerServer struct {
|
||||
closeChan chan bool
|
||||
timeoutThresholdChan chan interface{}
|
||||
|
||||
proxyPeerURL string
|
||||
proxyClientURL string
|
||||
standbyPeerURL string
|
||||
standbyClientURL string
|
||||
|
||||
metrics *metrics.Bucket
|
||||
sync.Mutex
|
||||
@ -134,7 +134,7 @@ func (s *PeerServer) Mode() Mode {
|
||||
|
||||
// SetMode updates the current mode of the server.
|
||||
// Switching to a peer mode will start the Raft server.
|
||||
// Switching to a proxy mode will stop the Raft server.
|
||||
// Switching to a standby mode will stop the Raft server.
|
||||
func (s *PeerServer) setMode(mode Mode) {
|
||||
s.mode = mode
|
||||
|
||||
@ -143,7 +143,7 @@ func (s *PeerServer) setMode(mode Mode) {
|
||||
if !s.raftServer.Running() {
|
||||
s.raftServer.Start()
|
||||
}
|
||||
case ProxyMode:
|
||||
case StandbyMode:
|
||||
if s.raftServer.Running() {
|
||||
s.raftServer.Stop()
|
||||
}
|
||||
@ -157,7 +157,7 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig {
|
||||
|
||||
// SetClusterConfig updates the current cluster configuration.
|
||||
// Adjusting the active size will cause the PeerServer to demote peers or
|
||||
// promote proxies to match the new size.
|
||||
// promote standbys to match the new size.
|
||||
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
||||
// Set minimums.
|
||||
if c.ActiveSize < MinActiveSize {
|
||||
@ -552,9 +552,9 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
||||
s.joinIndex = msg.CommitIndex
|
||||
s.setMode(msg.Mode)
|
||||
|
||||
if msg.Mode == ProxyMode {
|
||||
s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
|
||||
s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
|
||||
if msg.Mode == StandbyMode {
|
||||
s.standbyClientURL = resp.Header.Get("X-Leader-Client-URL")
|
||||
s.standbyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -711,7 +711,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
|
||||
}
|
||||
|
||||
// monitorActiveSize has the leader periodically check the status of cluster
|
||||
// nodes and swaps them out for proxies as needed.
|
||||
// nodes and swaps them out for standbys as needed.
|
||||
func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
||||
for {
|
||||
select {
|
||||
@ -728,7 +728,7 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
||||
// Retrieve target active size and actual active size.
|
||||
activeSize := s.ClusterConfig().ActiveSize
|
||||
peerCount := s.registry.PeerCount()
|
||||
proxies := s.registry.Proxies()
|
||||
standbys := s.registry.Standbys()
|
||||
peers := s.registry.Peers()
|
||||
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
|
||||
peers = append(peers[:index], peers[index+1:]...)
|
||||
@ -744,22 +744,22 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
// If we don't have enough active nodes then try to promote a proxy.
|
||||
if peerCount < activeSize && len(proxies) > 0 {
|
||||
// If we don't have enough active nodes then try to promote a standby.
|
||||
if peerCount < activeSize && len(standbys) > 0 {
|
||||
loop:
|
||||
for _, i := range rand.Perm(len(proxies)) {
|
||||
proxy := proxies[i]
|
||||
proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy)
|
||||
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, proxy, proxyPeerURL)
|
||||
for _, i := range rand.Perm(len(standbys)) {
|
||||
standby := standbys[i]
|
||||
standbyPeerURL, _ := s.registry.StandbyPeerURL(standby)
|
||||
log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, standby, standbyPeerURL)
|
||||
|
||||
// Notify proxy to promote itself.
|
||||
// Notify standby to promote itself.
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: false,
|
||||
ResponseHeaderTimeout: ActiveMonitorTimeout,
|
||||
},
|
||||
}
|
||||
resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil)
|
||||
resp, err := client.Post(fmt.Sprintf("%s/promote", standbyPeerURL), "application/json", nil)
|
||||
if err != nil {
|
||||
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
|
||||
continue
|
||||
@ -806,13 +806,13 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
|
||||
}
|
||||
|
||||
// Mode represents whether the server is an active peer or if the server is
|
||||
// simply acting as a proxy.
|
||||
// simply acting as a standby.
|
||||
type Mode string
|
||||
|
||||
const (
|
||||
// PeerMode is when the server is an active node in Raft.
|
||||
PeerMode = Mode("peer")
|
||||
|
||||
// ProxyMode is when the server is an inactive, request-forwarding node.
|
||||
ProxyMode = Mode("proxy")
|
||||
// StandbyMode is when the server is an inactive, request-forwarding node.
|
||||
StandbyMode = Mode("standby")
|
||||
)
|
||||
|
@ -172,8 +172,8 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request)
|
||||
|
||||
// Attempt to rejoin the cluster as a peer.
|
||||
func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.proxyPeerURL)
|
||||
url, err := url.Parse(ps.proxyPeerURL)
|
||||
log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.standbyPeerURL)
|
||||
url, err := url.Parse(ps.standbyPeerURL)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
@ -240,19 +240,19 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
|
||||
json.NewEncoder(w).Encode(&ps.clusterConfig)
|
||||
}
|
||||
|
||||
// Retrieves a list of peers and proxies.
|
||||
// Retrieves a list of peers and standbys.
|
||||
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
machines := make([]*machineMessage, 0)
|
||||
for _, name := range ps.registry.Peers() {
|
||||
machines = append(machines, ps.getMachineMessage(name))
|
||||
}
|
||||
for _, name := range ps.registry.Proxies() {
|
||||
for _, name := range ps.registry.Standbys() {
|
||||
machines = append(machines, ps.getMachineMessage(name))
|
||||
}
|
||||
json.NewEncoder(w).Encode(&machines)
|
||||
}
|
||||
|
||||
// Retrieve single peer or proxy.
|
||||
// Retrieve single peer or standby.
|
||||
func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
json.NewEncoder(w).Encode(ps.getMachineMessage(vars["name"]))
|
||||
@ -270,12 +270,12 @@ func (ps *PeerServer) getMachineMessage(name string) *machineMessage {
|
||||
}
|
||||
}
|
||||
|
||||
if ps.registry.ProxyExists(name) {
|
||||
clientURL, _ := ps.registry.ProxyClientURL(name)
|
||||
peerURL, _ := ps.registry.ProxyPeerURL(name)
|
||||
if ps.registry.StandbyExists(name) {
|
||||
clientURL, _ := ps.registry.StandbyClientURL(name)
|
||||
peerURL, _ := ps.registry.StandbyPeerURL(name)
|
||||
return &machineMessage{
|
||||
Name: name,
|
||||
Mode: ProxyMode,
|
||||
Mode: StandbyMode,
|
||||
ClientURL: clientURL,
|
||||
PeerURL: peerURL,
|
||||
}
|
||||
@ -357,7 +357,7 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
// machineMessage represents information about a peer or proxy in the registry.
|
||||
// machineMessage represents information about a peer or standby in the registry.
|
||||
type machineMessage struct {
|
||||
Name string `json:"name"`
|
||||
Mode Mode `json:"mode"`
|
||||
|
@ -16,15 +16,15 @@ import (
|
||||
// The location of the peer URL data.
|
||||
const RegistryPeerKey = "/_etcd/machines"
|
||||
|
||||
// The location of the proxy URL data.
|
||||
const RegistryProxyKey = "/_etcd/proxies"
|
||||
// The location of the standby URL data.
|
||||
const RegistryStandbyKey = "/_etcd/standbys"
|
||||
|
||||
// The Registry stores URL information for nodes.
|
||||
type Registry struct {
|
||||
sync.Mutex
|
||||
store store.Store
|
||||
peers map[string]*node
|
||||
proxies map[string]*node
|
||||
standbys map[string]*node
|
||||
}
|
||||
|
||||
// The internal storage format of the registry.
|
||||
@ -39,7 +39,7 @@ func NewRegistry(s store.Store) *Registry {
|
||||
return &Registry{
|
||||
store: s,
|
||||
peers: make(map[string]*node),
|
||||
proxies: make(map[string]*node),
|
||||
standbys: make(map[string]*node),
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,13 +56,13 @@ func (r *Registry) Peers() []string {
|
||||
return names
|
||||
}
|
||||
|
||||
// Proxies returns a list of cached proxy names.
|
||||
func (r *Registry) Proxies() []string {
|
||||
// Standbys returns a list of cached standby names.
|
||||
func (r *Registry) Standbys() []string {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
names := make([]string, 0, len(r.proxies))
|
||||
for name := range r.proxies {
|
||||
names := make([]string, 0, len(r.standbys))
|
||||
for name := range r.standbys {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Sort(sort.StringSlice(names))
|
||||
@ -81,15 +81,15 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterProxy adds a proxy to the registry.
|
||||
func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error {
|
||||
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
|
||||
// RegisterStandby adds a standby to the registry.
|
||||
func (r *Registry) RegisterStandby(name string, peerURL string, machURL string) error {
|
||||
if err := r.register(RegistryStandbyKey, name, peerURL, machURL); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.proxies[name] = r.load(RegistryProxyKey, name)
|
||||
r.standbys[name] = r.load(RegistryStandbyKey, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -108,9 +108,9 @@ func (r *Registry) UnregisterPeer(name string) error {
|
||||
return r.unregister(RegistryPeerKey, name)
|
||||
}
|
||||
|
||||
// UnregisterProxy removes a proxy from the registry.
|
||||
func (r *Registry) UnregisterProxy(name string) error {
|
||||
return r.unregister(RegistryProxyKey, name)
|
||||
// UnregisterStandby removes a standby from the registry.
|
||||
func (r *Registry) UnregisterStandby(name string) error {
|
||||
return r.unregister(RegistryStandbyKey, name)
|
||||
}
|
||||
|
||||
func (r *Registry) unregister(key, name string) error {
|
||||
@ -125,9 +125,9 @@ func (r *Registry) PeerCount() int {
|
||||
return r.count(RegistryPeerKey)
|
||||
}
|
||||
|
||||
// ProxyCount returns the number of proxies in the cluster.
|
||||
func (r *Registry) ProxyCount() int {
|
||||
return r.count(RegistryProxyKey)
|
||||
// StandbyCount returns the number of standbys in the cluster.
|
||||
func (r *Registry) StandbyCount() int {
|
||||
return r.count(RegistryStandbyKey)
|
||||
}
|
||||
|
||||
// Returns the number of nodes in the cluster.
|
||||
@ -144,9 +144,9 @@ func (r *Registry) PeerExists(name string) bool {
|
||||
return r.exists(RegistryPeerKey, name)
|
||||
}
|
||||
|
||||
// ProxyExists checks if a proxy with the given name exists.
|
||||
func (r *Registry) ProxyExists(name string) bool {
|
||||
return r.exists(RegistryProxyKey, name)
|
||||
// StandbyExists checks if a standby with the given name exists.
|
||||
func (r *Registry) StandbyExists(name string) bool {
|
||||
return r.exists(RegistryStandbyKey, name)
|
||||
}
|
||||
|
||||
func (r *Registry) exists(key, name string) bool {
|
||||
@ -211,39 +211,39 @@ func (r *Registry) peerURL(key, name string) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// Retrieves the client URL for a given proxy by name.
|
||||
func (r *Registry) ProxyClientURL(name string) (string, bool) {
|
||||
// Retrieves the client URL for a given standby by name.
|
||||
func (r *Registry) StandbyClientURL(name string) (string, bool) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.proxyClientURL(RegistryProxyKey, name)
|
||||
return r.standbyClientURL(RegistryStandbyKey, name)
|
||||
}
|
||||
|
||||
func (r *Registry) proxyClientURL(key, name string) (string, bool) {
|
||||
if r.proxies[name] == nil {
|
||||
func (r *Registry) standbyClientURL(key, name string) (string, bool) {
|
||||
if r.standbys[name] == nil {
|
||||
if node := r.load(key, name); node != nil {
|
||||
r.proxies[name] = node
|
||||
r.standbys[name] = node
|
||||
}
|
||||
}
|
||||
if node := r.proxies[name]; node != nil {
|
||||
if node := r.standbys[name]; node != nil {
|
||||
return node.url, true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// Retrieves the peer URL for a given proxy by name.
|
||||
func (r *Registry) ProxyPeerURL(name string) (string, bool) {
|
||||
// Retrieves the peer URL for a given standby by name.
|
||||
func (r *Registry) StandbyPeerURL(name string) (string, bool) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.proxyPeerURL(RegistryProxyKey, name)
|
||||
return r.standbyPeerURL(RegistryStandbyKey, name)
|
||||
}
|
||||
|
||||
func (r *Registry) proxyPeerURL(key, name string) (string, bool) {
|
||||
if r.proxies[name] == nil {
|
||||
func (r *Registry) standbyPeerURL(key, name string) (string, bool) {
|
||||
if r.standbys[name] == nil {
|
||||
if node := r.load(key, name); node != nil {
|
||||
r.proxies[name] = node
|
||||
r.standbys[name] = node
|
||||
}
|
||||
}
|
||||
if node := r.proxies[name]; node != nil {
|
||||
if node := r.standbys[name]; node != nil {
|
||||
return node.peerURL, true
|
||||
}
|
||||
return "", false
|
||||
@ -292,7 +292,7 @@ func (r *Registry) Invalidate(name string) {
|
||||
defer r.Unlock()
|
||||
|
||||
delete(r.peers, name)
|
||||
delete(r.proxies, name)
|
||||
delete(r.standbys, name)
|
||||
}
|
||||
|
||||
// Loads the given node by name from the store into the cache.
|
||||
|
@ -28,9 +28,9 @@ func (c *RemoveCommandV1) CommandName() string {
|
||||
func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
|
||||
ps, _ := context.Server().Context().(*PeerServer)
|
||||
|
||||
// If this is a proxy then remove it and exit.
|
||||
if ps.registry.ProxyExists(c.Name) {
|
||||
return []byte{0}, ps.registry.UnregisterProxy(c.Name)
|
||||
// If this is a standby then remove it and exit.
|
||||
if ps.registry.StandbyExists(c.Name) {
|
||||
return []byte{0}, ps.registry.UnregisterStandby(c.Name)
|
||||
}
|
||||
|
||||
// Remove node from the shared registry.
|
||||
@ -88,9 +88,9 @@ func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
|
||||
ps, _ := context.Server().Context().(*PeerServer)
|
||||
ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
|
||||
|
||||
// If this is a proxy then remove it and exit.
|
||||
if ps.registry.ProxyExists(c.Name) {
|
||||
if err := ps.registry.UnregisterProxy(c.Name); err != nil {
|
||||
// If this is a standby then remove it and exit.
|
||||
if ps.registry.StandbyExists(c.Name) {
|
||||
if err := ps.registry.UnregisterStandby(c.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret, nil
|
||||
|
@ -176,14 +176,14 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit
|
||||
// Log request.
|
||||
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
|
||||
|
||||
// Forward request along if the server is a proxy.
|
||||
if s.peerServer.Mode() == ProxyMode {
|
||||
if s.peerServer.proxyClientURL == "" {
|
||||
// Forward request along if the server is a standby.
|
||||
if s.peerServer.Mode() == StandbyMode {
|
||||
if s.peerServer.standbyClientURL == "" {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
etcdErr.NewError(402, "", 0).Write(w)
|
||||
return
|
||||
}
|
||||
uhttp.Redirect(s.peerServer.proxyClientURL, w, req)
|
||||
uhttp.Redirect(s.peerServer.standbyClientURL, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -13,8 +13,8 @@ import (
|
||||
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Create a full cluster and then add extra an extra proxy node.
|
||||
func TestProxy(t *testing.T) {
|
||||
// Create a full cluster and then add extra an extra standby node.
|
||||
func TestStandby(t *testing.T) {
|
||||
clusterSize := 10 // DefaultActiveSize + 1
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
assert.NoError(t, err)
|
||||
@ -34,7 +34,7 @@ func TestProxy(t *testing.T) {
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that all peers and proxies have the value.
|
||||
// Check that all peers and standbys have the value.
|
||||
for i := range etcds {
|
||||
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
|
||||
if assert.NoError(t, err) {
|
||||
@ -45,8 +45,8 @@ func TestProxy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that we have one proxy.
|
||||
result, err := c.Get("_etcd/proxies", false, true)
|
||||
// Verify that we have one standby.
|
||||
result, err := c.Get("_etcd/standbys", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||
|
||||
@ -58,8 +58,8 @@ func TestProxy(t *testing.T) {
|
||||
|
||||
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
|
||||
|
||||
// Verify that the proxy node is now a peer.
|
||||
result, err = c.Get("_etcd/proxies", false, true)
|
||||
// Verify that the standby node is now a peer.
|
||||
result, err = c.Get("_etcd/standbys", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 0)
|
||||
|
||||
@ -77,14 +77,14 @@ func TestProxy(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 8)
|
||||
|
||||
// Verify that we now have two proxies.
|
||||
result, err = c.Get("_etcd/proxies", false, true)
|
||||
// Verify that we now have two standbys.
|
||||
result, err = c.Get("_etcd/standbys", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 2)
|
||||
}
|
||||
|
||||
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
|
||||
func TestProxyAutoPromote(t *testing.T) {
|
||||
func TestStandbyAutoPromote(t *testing.T) {
|
||||
clusterSize := 10 // DefaultActiveSize + 1
|
||||
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
|
||||
if err != nil {
|
||||
@ -101,8 +101,8 @@ func TestProxyAutoPromote(t *testing.T) {
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Verify that we have one proxy.
|
||||
result, err := c.Get("_etcd/proxies", false, true)
|
||||
// Verify that we have one standby.
|
||||
result, err := c.Get("_etcd/standbys", false, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(result.Node.Nodes), 1)
|
||||
|
||||
@ -123,7 +123,7 @@ func TestProxyAutoPromote(t *testing.T) {
|
||||
// Wait for it to get dropped.
|
||||
time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second))
|
||||
|
||||
// Wait for the proxy to be promoted.
|
||||
// Wait for the standby to be promoted.
|
||||
time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second))
|
||||
|
||||
// Verify that we have 9 peers.
|
||||
@ -135,10 +135,10 @@ func TestProxyAutoPromote(t *testing.T) {
|
||||
result, err = c.Get("_etcd/machines/node10", false, false)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify that there are no more proxies.
|
||||
result, err = c.Get("_etcd/proxies", false, true)
|
||||
// Verify that there are no more standbys.
|
||||
result, err = c.Get("_etcd/standbys", false, true)
|
||||
assert.NoError(t, err)
|
||||
if assert.Equal(t, len(result.Node.Nodes), 1) {
|
||||
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/proxies/node2")
|
||||
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/standbys/node2")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user