feat: implement standby mode

Change log:
1. PeerServer
- estimate initial mode from its log through removedInLog variable
- refactor FindCluster to return the estimation
- refactor Start to call FindCluster explicitly
- move raftServer start and cluster init from FindCluster to Start
- remove stopNotify from PeerServer because it is not used anymore
2. Etcd
- refactor Run logic to fit the specification
3. ClusterConfig
- rename promoteDelay to removeDelay for better naming
- add SyncClusterInterval field to ClusterConfig
- commit command to set default cluster config when cluster is created
- store cluster config info into key space for consistency
- reload cluster config when reboot
4. add StandbyServer
5. Error
- remove unused EcodePromoteError
This commit is contained in:
Yicheng Qin
2014-05-08 19:47:19 -07:00
parent 5bd08a327d
commit baadf63912
22 changed files with 1186 additions and 384 deletions

View File

@@ -30,7 +30,7 @@ func NewClient(transport http.RoundTripper) *Client {
return &Client{http.Client{Transport: transport}}
}
// CheckVersion checks whether the version is available.
// CheckVersion returns true when the version check on the server returns 200.
func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
if err != nil {

View File

@@ -11,11 +11,17 @@ const (
// MinActiveSize is the minimum active size allowed.
MinActiveSize = 3
// DefaultPromoteDelay is the default elapsed time before promotion.
DefaultPromoteDelay = int((30 * time.Minute) / time.Second)
// DefaultRemoveDelay is the default elapsed time before promotion.
DefaultRemoveDelay = int((30 * time.Minute) / time.Second)
// MinPromoteDelay is the minimum promote delay allowed.
MinPromoteDelay = int((2 * time.Second) / time.Second)
// MinRemoveDelay is the minimum promote delay allowed.
MinRemoveDelay = int((2 * time.Second) / time.Second)
// DefaultSyncClusterInterval is the default interval for cluster sync.
DefaultSyncClusterInterval = int((30 * time.Minute) / time.Second)
// MinSyncClusterInterval is the minimum sync interval allowed.
MinSyncClusterInterval = int((1 * time.Second) / time.Second)
)
// ClusterConfig represents cluster-wide configuration settings.
@@ -25,15 +31,20 @@ type ClusterConfig struct {
// Nodes that join the cluster after the limit is reached are standbys.
ActiveSize int `json:"activeSize"`
// PromoteDelay is the amount of time, in seconds, after a node is
// unreachable that it will be swapped out for a standby node, if available.
PromoteDelay int `json:"promoteDelay"`
// RemoveDelay is the amount of time, in seconds, after a node is
// unreachable that it will be swapped out as a standby node.
RemoveDelay int `json:"removeDelay"`
// SyncClusterInterval is the amount of time, in seconds, between
// cluster sync when it runs in standby mode.
SyncClusterInterval int `json:"syncClusterInterval"`
}
// NewClusterConfig returns a cluster configuration with default settings.
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
ActiveSize: DefaultActiveSize,
PromoteDelay: DefaultPromoteDelay,
ActiveSize: DefaultActiveSize,
RemoveDelay: DefaultRemoveDelay,
SyncClusterInterval: DefaultSyncClusterInterval,
}
}

View File

@@ -61,6 +61,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
return 0, err
}
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}
@@ -74,7 +77,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
}
// Check peer number in the cluster
if ps.registry.Count() >= ps.ClusterConfig().ActiveSize {
count := ps.registry.Count()
// ClusterConfig doesn't init until first machine is added
if count > 0 && count >= ps.ClusterConfig().ActiveSize {
log.Debug("Reject join request from ", c.Name)
return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
}
@@ -93,6 +98,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}

View File

@@ -35,6 +35,9 @@ const (
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
// the cluster.
PeerActivityMonitorTimeout = 1 * time.Second
// The location of cluster config in key space.
ClusterConfigKey = "/_etcd/config"
)
type PeerServerConfig struct {
@@ -49,17 +52,18 @@ type PeerServerConfig struct {
type PeerServer struct {
Config PeerServerConfig
client *Client
clusterConfig *ClusterConfig
raftServer raft.Server
server *Server
joinIndex uint64
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store store.Store
snapConf *snapshotConf
stopNotify chan bool
joinIndex uint64
isNewCluster bool
removedInLog bool
removeNotify chan bool
started bool
closeChan chan bool
@@ -87,7 +91,6 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
s := &PeerServer{
Config: psConfig,
client: client,
clusterConfig: NewClusterConfig(),
registry: registry,
store: store,
followersStats: followersStats,
@@ -101,7 +104,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
return s
}
func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
s.snapConf = &snapshotConf{
checkingInterval: time.Second * 3,
// this is not accurate, we will update raft to provide an api
@@ -120,130 +123,7 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
s.raftServer = raftServer
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
return s.clusterConfig
}
// SetClusterConfig updates the current cluster configuration.
// Adjusting the active size will cause the PeerServer to demote peers or
// promote standbys to match the new size.
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
// Set minimums.
if c.ActiveSize < MinActiveSize {
c.ActiveSize = MinActiveSize
}
if c.PromoteDelay < MinPromoteDelay {
c.PromoteDelay = MinPromoteDelay
}
s.clusterConfig = c
}
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
//
// TODO(yichengq): RaftServer should be started as late as possible.
// Current implementation to start it is not that good,
// and should be refactored later.
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// It is not allowed to join the cluster with existing peer address
// This prevents old node joining with different name by mistake.
if !s.checkPeerAddressNonconflict() {
log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
}
// Take old nodes into account.
allPeers := s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
allPeers = append(allPeers, discoverPeers...)
}
allPeers = append(allPeers, peers...)
allPeers = s.removeSelfFromList(allPeers)
// If there is possible peer list, use it to find cluster.
if len(allPeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if err := s.startAsFollower(allPeers, 1); err == nil {
log.Debugf("%s joins to the previous cluster %v", name, allPeers)
return
}
log.Warnf("%s cannot connect to previous cluster %v", name, allPeers)
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
s.raftServer.Start()
log.Debugf("%s is restarting the cluster %v", name, allPeers)
return
}
// Attempt cluster discovery
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is registered in discover url
if discoverErr == nil {
// start as a leader in a new cluster
if len(discoverPeers) == 0 {
log.Debugf("%s is starting a new cluster via discover service", name)
s.startAsLeader()
} else {
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil {
log.Fatal(err)
}
}
return
}
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
if len(peers) == 0 {
log.Fatalf("%s, the new leader, must register itself to discovery service as required", name)
}
}
if len(peers) > 0 {
if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil {
log.Fatalf("%s cannot connect to existing cluster %v", name, peers)
}
return
}
log.Infof("%s is starting a new cluster.", s.Config.Name)
s.startAsLeader()
return
}
// Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
s.Lock()
defer s.Unlock()
if s.started {
return nil
}
s.started = true
s.removedInLog = false
// LoadSnapshot
if snapshot {
@@ -264,13 +144,142 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
log.Warnf("Failed setting NOCOW: %v", err)
}
}
}
s.findCluster(discoverURL, peers)
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// It is not allowed to join the cluster with existing peer address
// This prevents old node joining with different name by mistake.
if !s.checkPeerAddressNonconflict() {
err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
return
}
// Take old nodes into account.
possiblePeers = s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
possiblePeers = append(possiblePeers, discoverPeers...)
}
possiblePeers = append(possiblePeers, peers...)
possiblePeers = s.removeSelfFromList(possiblePeers)
if s.removedInLog {
return
}
// If there is possible peer list, use it to find cluster.
if len(possiblePeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
return
} else if ierr != nil {
log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr)
} else {
log.Debugf("%s joins to the previous cluster %v", name, possiblePeers)
toStart = true
return
}
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
toStart = true
return
}
// Attempt cluster discovery
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is registered in discover url
if discoverErr == nil {
// start as a leader in a new cluster
if len(discoverPeers) == 0 {
s.isNewCluster = true
log.Debugf("%s is starting a new cluster via discover service", name)
toStart = true
return
}
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
if rejected, ierr := s.startAsFollower(discoverPeers, s.Config.RetryTimes); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, discoverPeers, ierr)
possiblePeers = discoverPeers
} else if ierr != nil {
log.Warnf("%s cannot connect to existing cluster %v: %v", name, discoverPeers, ierr)
err = ierr
} else {
toStart = true
}
return
}
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
if len(peers) == 0 {
err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name)
return
}
}
if len(peers) > 0 {
log.Debugf("%s is joining peers %v from -peers flag", name, peers)
if rejected, ierr := s.startAsFollower(peers, s.Config.RetryTimes); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, peers, ierr)
possiblePeers = peers
} else if ierr != nil {
log.Warnf("%s cannot connect to existing peers %v: %v", name, peers, ierr)
err = ierr
} else {
toStart = true
}
return
}
s.isNewCluster = true
log.Infof("%s is starting a new cluster.", s.Config.Name)
toStart = true
return
}
// Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool) error {
s.Lock()
defer s.Unlock()
if s.started {
return nil
}
s.started = true
s.stopNotify = make(chan bool)
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.raftServer.Start()
if s.isNewCluster {
s.InitNewCluster()
s.isNewCluster = false
}
s.startRoutine(s.monitorSync)
s.startRoutine(s.monitorTimeoutThreshold)
s.startRoutine(s.monitorActiveSize)
@@ -298,7 +307,6 @@ func (s *PeerServer) Stop() {
// but this functionality has not been implemented.
s.raftServer.Stop()
s.routineGroup.Wait()
close(s.stopNotify)
}
// asyncRemove stops the server in peer mode.
@@ -326,11 +334,6 @@ func (s *PeerServer) asyncRemove() {
}()
}
// StopNotify notifies the server is stopped.
func (s *PeerServer) StopNotify() <-chan bool {
return s.stopNotify
}
// RemoveNotify notifies the server is removed from peer mode due to
// removal from the cluster.
func (s *PeerServer) RemoveNotify() <-chan bool {
@@ -362,6 +365,48 @@ func (s *PeerServer) HTTPHandler() http.Handler {
return router
}
func (s *PeerServer) SetJoinIndex(joinIndex uint64) {
s.joinIndex = joinIndex
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
e, err := s.store.Get(ClusterConfigKey, false, false)
// This is useful for backward compatibility because it doesn't
// set cluster config in older version.
if err != nil {
log.Debugf("failed getting cluster config key: %v", err)
return NewClusterConfig()
}
var c ClusterConfig
if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil {
log.Debugf("failed unmarshaling cluster config: %v", err)
return NewClusterConfig()
}
return &c
}
// SetClusterConfig updates the current cluster configuration.
// Adjusting the active size will cause the PeerServer to demote peers or
// promote standbys to match the new size.
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
// Set minimums.
if c.ActiveSize < MinActiveSize {
c.ActiveSize = MinActiveSize
}
if c.RemoveDelay < MinRemoveDelay {
c.RemoveDelay = MinRemoveDelay
}
if c.SyncClusterInterval < MinSyncClusterInterval {
c.SyncClusterInterval = MinSyncClusterInterval
}
log.Debugf("set cluster config as %v", c)
b, _ := json.Marshal(c)
s.store.Set(ClusterConfigKey, false, string(b), store.Permanent)
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() raft.Server {
return s.raftServer
@@ -372,40 +417,48 @@ func (s *PeerServer) SetServer(server *Server) {
s.server = server
}
func (s *PeerServer) startAsLeader() {
s.raftServer.Start()
func (s *PeerServer) InitNewCluster() {
// leader need to join self as a peer
s.doCommand(&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
})
log.Debugf("%s start as a leader", s.Config.Name)
s.joinIndex = 1
conf := NewClusterConfig()
s.doCommand(&SetClusterConfigCommand{Config: conf})
log.Debugf("%s sets cluster config as %v", s.Config.Name, conf)
}
func (s *PeerServer) doCommand(cmd raft.Command) {
for {
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
if _, err := s.raftServer.Do(c); err == nil {
if _, err := s.raftServer.Do(cmd); err == nil {
break
}
}
log.Debugf("%s start as a leader", s.Config.Name)
}
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) {
// start as a follower in a existing cluster
for i := 0; ; i++ {
ok := s.joinCluster(cluster)
if ok {
break
if rejected, err := s.joinCluster(cluster); rejected {
return true, err
} else if err == nil {
return false, nil
}
if i == retryTimes-1 {
return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
break
}
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
continue
}
s.raftServer.Start()
return nil
return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes)
}
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
@@ -483,7 +536,7 @@ func (s *PeerServer) getKnownPeers() []string {
for i := range peers {
u, err := url.Parse(peers[i])
if err != nil {
log.Debug("getPrevPeers cannot parse url %v", peers[i])
log.Debugf("getKnownPeers cannot parse url %v", peers[i])
}
peers[i] = u.Host
}
@@ -495,57 +548,55 @@ func (s *PeerServer) removeSelfFromList(peers []string) []string {
// Remove its own peer address from the peer list to join
u, err := url.Parse(s.Config.URL)
if err != nil {
log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL)
log.Warnf("failed parsing self peer address %v", s.Config.URL)
u = nil
}
newPeers := make([]string, 0)
for _, v := range peers {
if v != u.Host {
if u == nil || v != u.Host {
newPeers = append(newPeers, v)
}
}
return newPeers
}
func (s *PeerServer) joinCluster(cluster []string) bool {
func (s *PeerServer) joinCluster(cluster []string) (bool, error) {
for _, peer := range cluster {
if len(peer) == 0 {
continue
}
err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
if err == nil {
log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer)
return true
if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected {
return true, fmt.Errorf("rejected by peer %s: %v", peer, err)
} else if err == nil {
log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer)
return false, nil
} else {
log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err)
}
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
log.Warnf("Attempt to join via %s failed: %s", peer, err)
}
return false
return false, fmt.Errorf("unreachable cluster")
}
// Send join requests to peer.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
// The first return tells whether it is rejected by the cluster directly.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) {
u := (&url.URL{Host: peer, Scheme: scheme}).String()
// Our version must match the leaders version
version, err := s.client.GetVersion(u)
if err != nil {
return fmt.Errorf("fail checking join version: %v", err)
return false, fmt.Errorf("fail checking join version: %v", err)
}
if version < store.MinVersion() || version > store.MaxVersion() {
return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
}
// Fetch current peer list
machines, err := s.client.GetMachines(u)
if err != nil {
return fmt.Errorf("fail getting machine messages: %v", err)
return false, fmt.Errorf("fail getting machine messages: %v", err)
}
exist := false
for _, machine := range machines {
@@ -558,10 +609,10 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(u)
if err != nil {
return fmt.Errorf("fail getting cluster config: %v", err)
return false, fmt.Errorf("fail getting cluster config: %v", err)
}
if !exist && clusterConfig.ActiveSize <= len(machines) {
return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
}
joinIndex, err := s.client.AddMachine(u,
@@ -573,11 +624,11 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
EtcdURL: s.server.URL(),
})
if err != nil {
return fmt.Errorf("fail on join request: %v", err)
return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err)
}
s.joinIndex = joinIndex
return nil
return false, nil
}
func (s *PeerServer) Stats() []byte {
@@ -748,7 +799,7 @@ func (s *PeerServer) monitorActiveSize() {
// Retrieve target active size and actual active size.
activeSize := s.ClusterConfig().ActiveSize
peers := s.registry.Names()
peerCount := s.registry.Count()
peerCount := len(peers)
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...)
}
@@ -783,12 +834,12 @@ func (s *PeerServer) monitorPeerActivity() {
// Check last activity for all peers.
now := time.Now()
promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second
removeDelay := time.Duration(s.ClusterConfig().RemoveDelay) * time.Second
peers := s.raftServer.Peers()
for _, peer := range peers {
// If the last response from the peer is longer than the promote delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay {
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)

View File

@@ -188,7 +188,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
// Returns a JSON-encoded cluster configuration.
func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
json.NewEncoder(w).Encode(&ps.clusterConfig)
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Updates the cluster configuration.
@@ -201,15 +201,15 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
}
// Copy config and update fields passed in.
config := &ClusterConfig{
ActiveSize: ps.clusterConfig.ActiveSize,
PromoteDelay: ps.clusterConfig.PromoteDelay,
}
config := ps.ClusterConfig()
if activeSize, ok := m["activeSize"].(float64); ok {
config.ActiveSize = int(activeSize)
}
if promoteDelay, ok := m["promoteDelay"].(float64); ok {
config.PromoteDelay = int(promoteDelay)
if removeDelay, ok := m["removeDelay"].(float64); ok {
config.RemoveDelay = int(removeDelay)
}
if syncClusterInterval, ok := m["syncClusterInterval"].(float64); ok {
config.SyncClusterInterval = int(syncClusterInterval)
}
// Issue command to update.
@@ -217,10 +217,11 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
log.Debugf("[recv] Update Cluster Config Request")
ps.server.Dispatch(c, w, req)
json.NewEncoder(w).Encode(&ps.clusterConfig)
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Retrieves a list of peers and standbys.
// If leader exists, it is at the first place.
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
machines := make([]*machineMessage, 0)
leader := ps.raftServer.Leader()
@@ -229,6 +230,7 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re
machines = append(machines, msg)
}
}
json.NewEncoder(w).Encode(&machines)
}

View File

@@ -68,6 +68,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) {
} else {
// else ignore remove
log.Debugf("ignore previous remove command.")
ps.removedInLog = true
}
}
return commitIndex, nil

256
server/standby_server.go Normal file
View File

@@ -0,0 +1,256 @@
package server
import (
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/store"
)
const UninitedSyncClusterInterval = time.Duration(5) * time.Second
type StandbyServerConfig struct {
Name string
PeerScheme string
PeerURL string
ClientURL string
}
type StandbyServer struct {
Config StandbyServerConfig
client *Client
cluster []*machineMessage
syncClusterInterval time.Duration
joinIndex uint64
removeNotify chan bool
started bool
closeChan chan bool
routineGroup sync.WaitGroup
sync.Mutex
}
func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
return &StandbyServer{
Config: config,
client: client,
syncClusterInterval: UninitedSyncClusterInterval,
}
}
func (s *StandbyServer) Start() {
s.Lock()
defer s.Unlock()
if s.started {
return
}
s.started = true
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.monitorCluster()
}()
}
// Stop stops the server gracefully.
func (s *StandbyServer) Stop() {
s.Lock()
defer s.Unlock()
if !s.started {
return
}
s.started = false
close(s.closeChan)
s.routineGroup.Wait()
}
// RemoveNotify notifies the server is removed from standby mode and ready
// for peer mode. It should have joined the cluster successfully.
func (s *StandbyServer) RemoveNotify() <-chan bool {
return s.removeNotify
}
func (s *StandbyServer) ClientHTTPHandler() http.Handler {
return http.HandlerFunc(s.redirectRequests)
}
func (s *StandbyServer) Cluster() []string {
peerURLs := make([]string, 0)
for _, peer := range s.cluster {
peerURLs = append(peerURLs, peer.PeerURL)
}
return peerURLs
}
func (s *StandbyServer) ClusterSize() int {
return len(s.cluster)
}
func (s *StandbyServer) setCluster(cluster []*machineMessage) {
s.cluster = cluster
}
func (s *StandbyServer) SyncCluster(peers []string) error {
for i, url := range peers {
peers[i] = s.fullPeerURL(url)
}
if err := s.syncCluster(peers); err != nil {
log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
return err
}
log.Infof("set cluster(%v) for standby server", s.Cluster())
return nil
}
func (s *StandbyServer) SetSyncClusterInterval(second int) {
s.syncClusterInterval = time.Duration(second) * time.Second
}
func (s *StandbyServer) ClusterLeader() *machineMessage {
for _, machine := range s.cluster {
if machine.State == raft.Leader {
return machine
}
}
return nil
}
func (s *StandbyServer) JoinIndex() uint64 {
return s.joinIndex
}
func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
leader := s.ClusterLeader()
if leader == nil {
w.Header().Set("Content-Type", "application/json")
etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
return
}
uhttp.Redirect(leader.ClientURL, w, r)
}
func (s *StandbyServer) monitorCluster() {
for {
timer := time.NewTimer(s.syncClusterInterval)
defer timer.Stop()
select {
case <-s.closeChan:
return
case <-timer.C:
}
if err := s.syncCluster(nil); err != nil {
log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
continue
}
leader := s.ClusterLeader()
if leader == nil {
log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
continue
}
if err := s.join(leader.PeerURL); err != nil {
log.Debugf("fail joining through leader %v: %v", leader, err)
continue
}
log.Infof("join through leader %v", leader.PeerURL)
go func() {
s.Stop()
close(s.removeNotify)
}()
return
}
}
func (s *StandbyServer) syncCluster(peerURLs []string) error {
peerURLs = append(s.Cluster(), peerURLs...)
for _, peerURL := range peerURLs {
// Fetch current peer list
machines, err := s.client.GetMachines(peerURL)
if err != nil {
log.Debugf("fail getting machine messages from %v", peerURL)
continue
}
config, err := s.client.GetClusterConfig(peerURL)
if err != nil {
log.Debugf("fail getting cluster config from %v", peerURL)
continue
}
s.setCluster(machines)
s.SetSyncClusterInterval(config.SyncClusterInterval)
return nil
}
return fmt.Errorf("unreachable cluster")
}
func (s *StandbyServer) join(peer string) error {
// Our version must match the leaders version
version, err := s.client.GetVersion(peer)
if err != nil {
log.Debugf("fail checking join version")
return err
}
if version < store.MinVersion() || version > store.MaxVersion() {
log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
return fmt.Errorf("incompatible version")
}
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(peer)
if err != nil {
log.Debugf("fail getting cluster config")
return err
}
if clusterConfig.ActiveSize <= len(s.Cluster()) {
log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
return fmt.Errorf("out of quota")
}
commitIndex, err := s.client.AddMachine(peer,
&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.Config.Name,
RaftURL: s.Config.PeerURL,
EtcdURL: s.Config.ClientURL,
})
if err != nil {
log.Debugf("fail on join request")
return err
}
s.joinIndex = commitIndex
return nil
}
func (s *StandbyServer) fullPeerURL(urlStr string) string {
u, err := url.Parse(urlStr)
if err != nil {
log.Warnf("fail parsing url %v", u)
return urlStr
}
u.Scheme = s.Config.PeerScheme
return u.String()
}

View File

@@ -26,6 +26,6 @@ func TestV1DeleteKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":3}`, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4}`, "")
})
}

View File

@@ -36,7 +36,7 @@ func TestV1GetKey(t *testing.T) {
assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@@ -117,7 +117,7 @@ func TestV1WatchKey(t *testing.T) {
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@@ -133,7 +133,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) {
c := make(chan bool)
go func() {
v := url.Values{}
v.Set("index", "3")
v.Set("index", "4")
resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v1/watch/foo/bar"), v)
body = tests.ReadBodyJSON(resp)
c <- true
@@ -173,7 +173,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) {
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 3, "")
assert.Equal(t, body["index"], 4, "")
})
}

View File

@@ -25,7 +25,7 @@ func TestV1SetKey(t *testing.T) {
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":2}`, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":3}`, "")
})
}
@@ -127,7 +127,7 @@ func TestV1SetKeyCASOnValueSuccess(t *testing.T) {
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "testAndSet", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 3, "")
assert.Equal(t, body["index"], 4, "")
})
}
@@ -152,6 +152,6 @@ func TestV1SetKeyCASOnValueFail(t *testing.T) {
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}

View File

@@ -26,7 +26,7 @@ func TestV2DeleteKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@@ -48,7 +48,7 @@ func TestV2DeleteEmptyDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@@ -70,7 +70,7 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@@ -87,14 +87,14 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
// Ensures that a key is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=3
//
func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@@ -102,14 +102,14 @@ func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{})
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=3"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@@ -164,7 +164,7 @@ func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) {
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}

View File

@@ -36,7 +36,7 @@ func TestV2GetKey(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
@@ -65,7 +65,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["dir"], true, "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, len(node["nodes"].([]interface{})), 2, "")
node0 := node["nodes"].([]interface{})[0].(map[string]interface{})
@@ -130,7 +130,7 @@ func TestV2WatchKey(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
@@ -145,7 +145,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
var body map[string]interface{}
c := make(chan bool)
go func() {
resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=3"))
resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=4"))
body = tests.ReadBodyJSON(resp)
c <- true
}()
@@ -185,7 +185,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}

View File

@@ -26,9 +26,9 @@ func TestV2CreateUnique(t *testing.T) {
assert.Equal(t, body["action"], "create", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/2", "")
assert.Equal(t, node["key"], "/foo/bar/3", "")
assert.Nil(t, node["dir"], "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
// Second POST should add next index to list.
resp, _ = tests.PostForm(fullURL, nil)
@@ -36,7 +36,7 @@ func TestV2CreateUnique(t *testing.T) {
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/3", "")
assert.Equal(t, node["key"], "/foo/bar/4", "")
// POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
@@ -44,6 +44,6 @@ func TestV2CreateUnique(t *testing.T) {
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/baz/4", "")
assert.Equal(t, node["key"], "/foo/baz/5", "")
})
}

View File

@@ -24,7 +24,7 @@ func TestV2SetKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@@ -38,7 +38,7 @@ func TestV2SetDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@@ -244,14 +244,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "2")
v.Set("prevIndex", "3")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@@ -275,8 +275,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[10 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[10 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
@@ -319,7 +319,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@@ -344,7 +344,7 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@@ -369,7 +369,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
// Ensures that a key is not set if both previous value and index do not match.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=4
//
func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@@ -381,21 +381,21 @@ func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
v.Set("prevIndex", "3")
v.Set("prevIndex", "4")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX] [3 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[AAA != XXX] [4 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
// Ensures that a key is not set if previous value match but index does not.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=3
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=4
//
func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@@ -407,21 +407,21 @@ func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "XXX")
v.Set("prevIndex", "3")
v.Set("prevIndex", "4")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[3 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[4 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
// Ensures that a key is not set if previous index matches but value does not.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=2
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3
//
func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@@ -433,14 +433,14 @@ func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
v.Set("prevIndex", "2")
v.Set("prevIndex", "3")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@@ -455,6 +455,6 @@ func TestV2SetKeyCASWithEmptyValueSuccess(t *testing.T) {
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":2,"createdIndex":2}}`)
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":3,"createdIndex":3}}`)
})
}