diff --git a/etcd/etcd.go b/etcd/etcd.go index f5da2116e..1f4570ae9 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -232,6 +232,7 @@ func (e *Etcd) Run() { DataDir: e.Config.DataDir, } e.StandbyServer = server.NewStandbyServer(ssConfig, client) + e.StandbyServer.SetRaftServer(raftServer) // Generating config could be slow. // Put it here to make listen happen immediately after peer-server starting. @@ -347,6 +348,7 @@ func (e *Etcd) runServer() { raftServer.SetElectionTimeout(electionTimeout) raftServer.SetHeartbeatInterval(heartbeatInterval) e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) + e.StandbyServer.SetRaftServer(raftServer) e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex()) e.setMode(PeerMode) diff --git a/server/peer_server.go b/server/peer_server.go index d7a111400..04fd3bbe1 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -214,6 +214,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo // TODO(yichengq): Think about the action that should be done // if it cannot connect any of the previous known node. log.Debugf("%s is restarting the cluster %v", name, possiblePeers) + s.SetJoinIndex(s.raftServer.CommitIndex()) toStart = true return } diff --git a/server/standby_server.go b/server/standby_server.go index 551f88747..ce9e93a9f 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -36,8 +36,9 @@ type standbyInfo struct { } type StandbyServer struct { - Config StandbyServerConfig - client *Client + Config StandbyServerConfig + client *Client + raftServer raft.Server standbyInfo joinIndex uint64 @@ -62,6 +63,10 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer return s } +func (s *StandbyServer) SetRaftServer(raftServer raft.Server) { + s.raftServer = raftServer +} + func (s *StandbyServer) Start() { s.Lock() defer s.Unlock() @@ -237,7 +242,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error { func (s *StandbyServer) join(peer string) error { for _, url := range s.ClusterURLs() { if s.Config.PeerURL == url { - s.joinIndex = 0 + s.joinIndex = s.raftServer.CommitIndex() return nil } } diff --git a/tests/functional/multi_node_kill_all_and_recovery_test.go b/tests/functional/multi_node_kill_all_and_recovery_test.go index 42b54b461..404cf972f 100644 --- a/tests/functional/multi_node_kill_all_and_recovery_test.go +++ b/tests/functional/multi_node_kill_all_and_recovery_test.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "strconv" + "strings" "testing" "time" @@ -100,6 +101,8 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) { t.Fatal("cannot create cluster") } + time.Sleep(time.Second) + c := etcd.NewClient(nil) go Monitor(clusterSize, clusterSize, leaderChan, all, stop) @@ -239,3 +242,68 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 7) } + +// Create a five nodes +// Kill all the nodes and restart, then remove the leader +func TestMultiNodeKillAllAndRecoveryAndRemoveLeader(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + c := etcd.NewClient(nil) + + go Monitor(clusterSize, clusterSize, leaderChan, all, stop) + <-all + <-leaderChan + stop <- true + + c.SyncCluster() + + // kill all + DestroyCluster(etcds) + + time.Sleep(time.Second) + + stop = make(chan bool) + leaderChan = make(chan string, 1) + all = make(chan bool, 1) + + time.Sleep(time.Second) + + for i := 0; i < clusterSize; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + go Monitor(clusterSize, 1, leaderChan, all, stop) + + <-all + leader := <-leaderChan + + _, err = c.Set("foo", "bar", 0) + if err != nil { + t.Fatalf("Recovery error: %s", err) + } + + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) + num := port - 7000 + resp, _ := tests.Delete(leader+"/v2/admin/machines/node"+strconv.Itoa(num), "application/json", nil) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // check the old leader is in standby mode now + time.Sleep(time.Second) + resp, _ = tests.Get(leader + "/name") + assert.Equal(t, resp.StatusCode, 404) +} diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 117f6d1bc..8199c9891 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -169,7 +169,8 @@ func TestRemovePausedNode(t *testing.T) { if !assert.Equal(t, r.StatusCode, 200) { t.FailNow() } - time.Sleep(2 * time.Second) + // Wait for standby instances to update its cluster config + time.Sleep(6 * time.Second) resp, err := c.Get("_etcd/machines", false, false) if err != nil { diff --git a/tests/functional/simple_snapshot_test.go b/tests/functional/simple_snapshot_test.go index 5980f1eb7..c238d8691 100644 --- a/tests/functional/simple_snapshot_test.go +++ b/tests/functional/simple_snapshot_test.go @@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) { index, _ = strconv.Atoi(snapshots[0].Name()[2:6]) - if index < 1010 || index > 1025 { + if index < 1010 || index > 1029 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } }