From b7d9fdbd395c1c5e5d4c6ba879b7db5b80953139 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 15 May 2014 02:38:22 -0400 Subject: [PATCH 1/7] feat(standby_server): write cluster info to disk For better fault tolerance and availability. --- Documentation/design/standbys.md | 10 +- etcd/etcd.go | 23 +++-- server/standby_server.go | 97 +++++++++++++++++-- .../multi_node_kill_all_and_recovery_test.go | 22 ++--- tests/functional/remove_node_test.go | 38 ++++++-- tests/functional/util.go | 4 +- 6 files changed, 151 insertions(+), 43 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index a771183c1..e3ae7d5ad 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -33,6 +33,9 @@ After each interval, standbys synchronize information with cluster. #### Main logic ``` +If find existing standby cluster info: + Goto standby loop + Find cluster as required If determine to start peer server: Goto peer loop @@ -74,7 +77,6 @@ return true **Note** 1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state. 2. Even if sync cluster fails, it will restart still for recovery from full outage. -3. [BUG] Possible peers from discover URL, peers flag and data dir could be outdated because standby machine doesn't record log. This could make reconnect fail if the whole cluster migrates to new address. #### Peer mode start logic @@ -100,11 +102,12 @@ When removed from the cluster: Loop: Sleep for some time - Sync cluster + Sync cluster, and write cluster info into disk If peer count < active size: Send join request If succeed: + Clear cluster info from disk Return ``` @@ -192,9 +195,6 @@ Machines in peer mode recover heartbeat between each other. Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target. -**Note** -1. [TODO] Machine which runs in standby mode and has no log data cannot be recovered. But it could join the cluster finally if it is restarted always. - ### Kill one peer machine diff --git a/etcd/etcd.go b/etcd/etcd.go index b493f3f47..1112bd3c4 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -229,22 +229,29 @@ func (e *Etcd) Run() { PeerScheme: e.Config.PeerTLSInfo().Scheme(), PeerURL: e.Config.Peer.Addr, ClientURL: e.Config.Addr, + DataDir: e.Config.DataDir, + } + if e.StandbyServer, err = server.NewStandbyServer(ssConfig, client); err != nil { + log.Fatal("error new standby server:", err) } - e.StandbyServer = server.NewStandbyServer(ssConfig, client) // Generating config could be slow. // Put it here to make listen happen immediately after peer-server starting. peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) - startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) - if err != nil { - log.Fatal(err) - } - if startPeerServer { - e.setMode(PeerMode) + if !e.StandbyServer.ClusterRecorded() { + startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) + if err != nil { + log.Fatal(err) + } + if startPeerServer { + e.setMode(PeerMode) + } else { + e.StandbyServer.SyncCluster(possiblePeers) + e.setMode(StandbyMode) + } } else { - e.StandbyServer.SyncCluster(possiblePeers) e.setMode(StandbyMode) } diff --git a/server/standby_server.go b/server/standby_server.go index a1c6c95dd..8f18755f4 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -1,9 +1,12 @@ package server import ( + "encoding/json" "fmt" "net/http" "net/url" + "os" + "path/filepath" "sync" "time" @@ -15,11 +18,14 @@ import ( "github.com/coreos/etcd/store" ) +const clusterInfoName = "cluster_info" + type StandbyServerConfig struct { Name string PeerScheme string PeerURL string ClientURL string + DataDir string } type StandbyServer struct { @@ -30,6 +36,9 @@ type StandbyServer struct { syncInterval float64 joinIndex uint64 + file *os.File + recorded bool + removeNotify chan bool started bool closeChan chan bool @@ -38,12 +47,19 @@ type StandbyServer struct { sync.Mutex } -func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer { - return &StandbyServer{ +func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) { + s := &StandbyServer{ Config: config, client: client, syncInterval: DefaultSyncInterval, } + if err := s.openClusterInfo(); err != nil { + return nil, fmt.Errorf("error open/create cluster info file: %v", err) + } + if clusterInfo, err := s.loadClusterInfo(); err == nil { + s.setCluster(clusterInfo) + } + return s, nil } func (s *StandbyServer) Start() { @@ -75,6 +91,10 @@ func (s *StandbyServer) Stop() { close(s.closeChan) s.routineGroup.Wait() + + if err := s.clearClusterInfo(); err != nil { + log.Warnf("error clearing cluster info for standby") + } } // RemoveNotify notifies the server is removed from standby mode and ready @@ -87,6 +107,10 @@ func (s *StandbyServer) ClientHTTPHandler() http.Handler { return http.HandlerFunc(s.redirectRequests) } +func (s *StandbyServer) ClusterRecorded() bool { + return s.recorded +} + func (s *StandbyServer) Cluster() []string { peerURLs := make([]string, 0) for _, peer := range s.cluster { @@ -145,14 +169,18 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) } func (s *StandbyServer) monitorCluster() { + first := true for { - timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) - defer timer.Stop() - select { - case <-s.closeChan: - return - case <-timer.C: + if !first { + timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } } + first = false if err := s.syncCluster(nil); err != nil { log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err) @@ -198,6 +226,9 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error { s.setCluster(machines) s.SetSyncInterval(config.SyncInterval) + if err := s.saveClusterInfo(); err != nil { + log.Warnf("fail saving cluster info into disk: %v", err) + } return nil } return fmt.Errorf("unreachable cluster") @@ -252,3 +283,53 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string { u.Scheme = s.Config.PeerScheme return u.String() } + +func (s *StandbyServer) openClusterInfo() error { + var err error + path := filepath.Join(s.Config.DataDir, clusterInfoName) + s.file, err = os.OpenFile(path, os.O_RDWR, 0600) + if err != nil { + if os.IsNotExist(err) { + s.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) + } + return err + } + return nil +} + +func (s *StandbyServer) loadClusterInfo() ([]*machineMessage, error) { + clusterInfo := make([]*machineMessage, 0) + if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { + return nil, err + } + if err := json.NewDecoder(s.file).Decode(&clusterInfo); err != nil { + return nil, err + } + s.recorded = true + return clusterInfo, nil +} + +func (s *StandbyServer) saveClusterInfo() error { + if err := s.clearClusterInfo(); err != nil { + return nil + } + if err := json.NewEncoder(s.file).Encode(s.cluster); err != nil { + return err + } + if err := s.file.Sync(); err != nil { + return err + } + s.recorded = true + return nil +} + +func (s *StandbyServer) clearClusterInfo() error { + if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { + return err + } + if err := s.file.Truncate(0); err != nil { + return err + } + s.recorded = false + return nil +} diff --git a/tests/functional/multi_node_kill_all_and_recovery_test.go b/tests/functional/multi_node_kill_all_and_recovery_test.go index ed149d16d..42b54b461 100644 --- a/tests/functional/multi_node_kill_all_and_recovery_test.go +++ b/tests/functional/multi_node_kill_all_and_recovery_test.go @@ -167,7 +167,7 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { leaderChan := make(chan string, 1) all := make(chan bool, 1) - clusterSize := 5 + clusterSize := 15 argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) defer DestroyCluster(etcds) @@ -184,8 +184,8 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { c.SyncCluster() - // Reconfigure with smaller active size (3 nodes) and wait for demotion. - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`)) + // Reconfigure with smaller active size (7 nodes) and wait for remove. + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -195,10 +195,10 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { // Verify that there is three machines in peer mode. result, err := c.Get("_etcd/machines", false, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 3) + assert.Equal(t, len(result.Node.Nodes), 7) - // send 10 commands - for i := 0; i < 10; i++ { + // send set commands + for i := 0; i < 2*clusterSize; i++ { // Test Set _, err := c.Set("foo", "bar", 0) if err != nil { @@ -220,13 +220,13 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { time.Sleep(time.Second) for i := 0; i < clusterSize; i++ { - etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-peers="), procAttr) } time.Sleep(2 * time.Second) - // send 10 commands - for i := 0; i < 10; i++ { + // send set commands + for i := 0; i < 2*clusterSize; i++ { // Test Set _, err := c.Set("foo", "bar", 0) if err != nil { @@ -234,8 +234,8 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { } } - // Verify that we have three machines. + // Verify that we have seven machines. result, err = c.Get("_etcd/machines", false, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 3) + assert.Equal(t, len(result.Node.Nodes), 7) } diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 853546bcb..954700a15 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -19,7 +19,7 @@ func TestRemoveNode(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - clusterSize := 3 + clusterSize := 4 argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false) defer DestroyCluster(etcds) @@ -29,7 +29,7 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -39,6 +39,11 @@ func TestRemoveNode(t *testing.T) { client := &http.Client{} for i := 0; i < 2; i++ { for i := 0; i < 2; i++ { + r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`)) + if !assert.Equal(t, r.StatusCode, 200) { + t.FailNow() + } + client.Do(rmReq) fmt.Println("send remove to node3 and wait for its exiting") @@ -50,7 +55,7 @@ func TestRemoveNode(t *testing.T) { panic(err) } - if len(resp.Node.Nodes) != 2 { + if len(resp.Node.Nodes) != 3 { t.Fatal("cannot remove peer") } @@ -69,6 +74,11 @@ func TestRemoveNode(t *testing.T) { panic(err) } + r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`)) + if !assert.Equal(t, r.StatusCode, 200) { + t.FailNow() + } + time.Sleep(time.Second + time.Second) resp, err = c.Get("_etcd/machines", false, false) @@ -77,13 +87,18 @@ func TestRemoveNode(t *testing.T) { panic(err) } - if len(resp.Node.Nodes) != 3 { - t.Fatalf("add peer fails #1 (%d != 3)", len(resp.Node.Nodes)) + if len(resp.Node.Nodes) != 4 { + t.Fatalf("add peer fails #1 (%d != 4)", len(resp.Node.Nodes)) } } // first kill the node, then remove it, then add it back for i := 0; i < 2; i++ { + r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`)) + if !assert.Equal(t, r.StatusCode, 200) { + t.FailNow() + } + etcds[2].Kill() fmt.Println("kill node3 and wait for its exiting") etcds[2].Wait() @@ -96,7 +111,7 @@ func TestRemoveNode(t *testing.T) { panic(err) } - if len(resp.Node.Nodes) != 2 { + if len(resp.Node.Nodes) != 3 { t.Fatal("cannot remove peer") } @@ -112,7 +127,12 @@ func TestRemoveNode(t *testing.T) { panic(err) } - time.Sleep(time.Second) + r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`)) + if !assert.Equal(t, r.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(time.Second + time.Second) resp, err = c.Get("_etcd/machines", false, false) @@ -120,8 +140,8 @@ func TestRemoveNode(t *testing.T) { panic(err) } - if len(resp.Node.Nodes) != 3 { - t.Fatalf("add peer fails #2 (%d != 3)", len(resp.Node.Nodes)) + if len(resp.Node.Nodes) != 4 { + t.Fatalf("add peer fails #2 (%d != 4)", len(resp.Node.Nodes)) } } } diff --git a/tests/functional/util.go b/tests/functional/util.go index 94b4e2833..135ee14e2 100644 --- a/tests/functional/util.go +++ b/tests/functional/util.go @@ -169,7 +169,7 @@ func DestroyCluster(etcds []*os.Process) error { // func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) { leaderMap := make(map[int]string) - baseAddrFormat := "http://0.0.0.0:400%d" + baseAddrFormat := "http://0.0.0.0:%d" for { knownLeader := "unknown" @@ -177,7 +177,7 @@ func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, var i int for i = 0; i < size; i++ { - leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1)) + leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+4001)) if err == nil { leaderMap[i] = leader From e5ce4fca2e35b3cfde2a60e93b292e84471ce148 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 15 May 2014 22:47:12 -0400 Subject: [PATCH 2/7] docs(standbys): clarify pseudocode --- Documentation/design/standbys.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index e3ae7d5ad..de0b85aa8 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -104,11 +104,10 @@ Loop: Sync cluster, and write cluster info into disk - If peer count < active size: - Send join request - If succeed: - Clear cluster info from disk - Return + Check active size and send join request if needed + If succeed: + Clear cluster info from disk + Return ``` From 716496ec42940478875230d045813a7076692f10 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 15 May 2014 23:18:59 -0400 Subject: [PATCH 3/7] chore(standby_server): still sleep for the first time --- server/standby_server.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/server/standby_server.go b/server/standby_server.go index 8f18755f4..b9c7f29d8 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -168,19 +168,17 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) uhttp.Redirect(leader.ClientURL, w, r) } +// monitorCluster assumes that the machine has tried to join the cluster and +// failed, so it waits for the interval at the beginning. func (s *StandbyServer) monitorCluster() { - first := true for { - if !first { - timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) - defer timer.Stop() - select { - case <-s.closeChan: - return - case <-timer.C: - } + timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: } - first = false if err := s.syncCluster(nil); err != nil { log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err) From 35cc81e22fadd1f8736f74827dc22c4fdc164c63 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 15 May 2014 23:57:58 -0400 Subject: [PATCH 4/7] feat(standby_server): save/load syncInterval to disk --- config/config.go | 3 ++ server/standby_server.go | 75 ++++++++++++++++++++-------------------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index c2809312a..98302e349 100644 --- a/config/config.go +++ b/config/config.go @@ -366,6 +366,9 @@ func (c *Config) Reset() error { if err := os.RemoveAll(filepath.Join(c.DataDir, "snapshot")); err != nil { return err } + if err := os.RemoveAll(filepath.Join(c.DataDir, "standby_info")); err != nil { + return err + } return nil } diff --git a/server/standby_server.go b/server/standby_server.go index b9c7f29d8..5224e19d8 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -18,7 +18,7 @@ import ( "github.com/coreos/etcd/store" ) -const clusterInfoName = "cluster_info" +const standbyInfoName = "standby_info" type StandbyServerConfig struct { Name string @@ -28,13 +28,17 @@ type StandbyServerConfig struct { DataDir string } +type standbyInfo struct { + Cluster []*machineMessage + SyncInterval float64 +} + type StandbyServer struct { Config StandbyServerConfig client *Client - cluster []*machineMessage - syncInterval float64 - joinIndex uint64 + standbyInfo + joinIndex uint64 file *os.File recorded bool @@ -49,16 +53,14 @@ type StandbyServer struct { func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) { s := &StandbyServer{ - Config: config, - client: client, - syncInterval: DefaultSyncInterval, + Config: config, + client: client, + standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval}, } - if err := s.openClusterInfo(); err != nil { + if err := s.openStandbyInfo(); err != nil { return nil, fmt.Errorf("error open/create cluster info file: %v", err) } - if clusterInfo, err := s.loadClusterInfo(); err == nil { - s.setCluster(clusterInfo) - } + s.loadStandbyInfo() return s, nil } @@ -92,7 +94,7 @@ func (s *StandbyServer) Stop() { close(s.closeChan) s.routineGroup.Wait() - if err := s.clearClusterInfo(); err != nil { + if err := s.clearStandbyInfo(); err != nil { log.Warnf("error clearing cluster info for standby") } } @@ -111,20 +113,20 @@ func (s *StandbyServer) ClusterRecorded() bool { return s.recorded } -func (s *StandbyServer) Cluster() []string { +func (s *StandbyServer) ClusterURLs() []string { peerURLs := make([]string, 0) - for _, peer := range s.cluster { + for _, peer := range s.Cluster { peerURLs = append(peerURLs, peer.PeerURL) } return peerURLs } func (s *StandbyServer) ClusterSize() int { - return len(s.cluster) + return len(s.Cluster) } func (s *StandbyServer) setCluster(cluster []*machineMessage) { - s.cluster = cluster + s.Cluster = cluster } func (s *StandbyServer) SyncCluster(peers []string) error { @@ -133,20 +135,20 @@ func (s *StandbyServer) SyncCluster(peers []string) error { } if err := s.syncCluster(peers); err != nil { - log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err) + log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err) return err } - log.Infof("set cluster(%v) for standby server", s.Cluster()) + log.Infof("set cluster(%v) for standby server", s.ClusterURLs()) return nil } func (s *StandbyServer) SetSyncInterval(second float64) { - s.syncInterval = second + s.SyncInterval = second } func (s *StandbyServer) ClusterLeader() *machineMessage { - for _, machine := range s.cluster { + for _, machine := range s.Cluster { if machine.State == raft.Leader { return machine } @@ -172,7 +174,7 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) // failed, so it waits for the interval at the beginning. func (s *StandbyServer) monitorCluster() { for { - timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) + timer := time.NewTimer(time.Duration(int64(s.SyncInterval * float64(time.Second)))) defer timer.Stop() select { case <-s.closeChan: @@ -181,13 +183,13 @@ func (s *StandbyServer) monitorCluster() { } if err := s.syncCluster(nil); err != nil { - log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err) + log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err) continue } leader := s.ClusterLeader() if leader == nil { - log.Warnf("fail getting leader from cluster(%v)", s.Cluster()) + log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs()) continue } @@ -206,7 +208,7 @@ func (s *StandbyServer) monitorCluster() { } func (s *StandbyServer) syncCluster(peerURLs []string) error { - peerURLs = append(s.Cluster(), peerURLs...) + peerURLs = append(s.ClusterURLs(), peerURLs...) for _, peerURL := range peerURLs { // Fetch current peer list @@ -224,7 +226,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error { s.setCluster(machines) s.SetSyncInterval(config.SyncInterval) - if err := s.saveClusterInfo(); err != nil { + if err := s.saveStandbyInfo(); err != nil { log.Warnf("fail saving cluster info into disk: %v", err) } return nil @@ -250,8 +252,8 @@ func (s *StandbyServer) join(peer string) error { log.Debugf("error getting cluster config") return err } - if clusterConfig.ActiveSize <= len(s.Cluster()) { - log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster())) + if clusterConfig.ActiveSize <= len(s.Cluster) { + log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster)) return fmt.Errorf("out of quota") } @@ -282,9 +284,9 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string { return u.String() } -func (s *StandbyServer) openClusterInfo() error { +func (s *StandbyServer) openStandbyInfo() error { var err error - path := filepath.Join(s.Config.DataDir, clusterInfoName) + path := filepath.Join(s.Config.DataDir, standbyInfoName) s.file, err = os.OpenFile(path, os.O_RDWR, 0600) if err != nil { if os.IsNotExist(err) { @@ -295,23 +297,22 @@ func (s *StandbyServer) openClusterInfo() error { return nil } -func (s *StandbyServer) loadClusterInfo() ([]*machineMessage, error) { - clusterInfo := make([]*machineMessage, 0) +func (s *StandbyServer) loadStandbyInfo() ([]*machineMessage, error) { if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { return nil, err } - if err := json.NewDecoder(s.file).Decode(&clusterInfo); err != nil { + if err := json.NewDecoder(s.file).Decode(&s.standbyInfo); err != nil { return nil, err } s.recorded = true - return clusterInfo, nil + return s.standbyInfo.Cluster, nil } -func (s *StandbyServer) saveClusterInfo() error { - if err := s.clearClusterInfo(); err != nil { +func (s *StandbyServer) saveStandbyInfo() error { + if err := s.clearStandbyInfo(); err != nil { return nil } - if err := json.NewEncoder(s.file).Encode(s.cluster); err != nil { + if err := json.NewEncoder(s.file).Encode(s.standbyInfo); err != nil { return err } if err := s.file.Sync(); err != nil { @@ -321,7 +322,7 @@ func (s *StandbyServer) saveClusterInfo() error { return nil } -func (s *StandbyServer) clearClusterInfo() error { +func (s *StandbyServer) clearStandbyInfo() error { if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { return err } From a824be4c144e304ff9212b60eecac745e760775e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 16 May 2014 00:10:15 -0400 Subject: [PATCH 5/7] feat(standby_server): save/load Running into disk --- etcd/etcd.go | 2 +- server/standby_server.go | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 1112bd3c4..7c708de10 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -240,7 +240,7 @@ func (e *Etcd) Run() { peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) - if !e.StandbyServer.ClusterRecorded() { + if !e.StandbyServer.IsRunning() { startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) if err != nil { log.Fatal(err) diff --git a/server/standby_server.go b/server/standby_server.go index 5224e19d8..4ff13d111 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -29,6 +29,7 @@ type StandbyServerConfig struct { } type standbyInfo struct { + Running bool Cluster []*machineMessage SyncInterval float64 } @@ -40,8 +41,7 @@ type StandbyServer struct { standbyInfo joinIndex uint64 - file *os.File - recorded bool + file *os.File removeNotify chan bool started bool @@ -80,6 +80,7 @@ func (s *StandbyServer) Start() { defer s.routineGroup.Done() s.monitorCluster() }() + s.Running = true } // Stop stops the server gracefully. @@ -97,6 +98,7 @@ func (s *StandbyServer) Stop() { if err := s.clearStandbyInfo(); err != nil { log.Warnf("error clearing cluster info for standby") } + s.Running = false } // RemoveNotify notifies the server is removed from standby mode and ready @@ -109,8 +111,8 @@ func (s *StandbyServer) ClientHTTPHandler() http.Handler { return http.HandlerFunc(s.redirectRequests) } -func (s *StandbyServer) ClusterRecorded() bool { - return s.recorded +func (s *StandbyServer) IsRunning() bool { + return s.Running } func (s *StandbyServer) ClusterURLs() []string { @@ -304,7 +306,6 @@ func (s *StandbyServer) loadStandbyInfo() ([]*machineMessage, error) { if err := json.NewDecoder(s.file).Decode(&s.standbyInfo); err != nil { return nil, err } - s.recorded = true return s.standbyInfo.Cluster, nil } @@ -318,7 +319,6 @@ func (s *StandbyServer) saveStandbyInfo() error { if err := s.file.Sync(); err != nil { return err } - s.recorded = true return nil } @@ -329,6 +329,5 @@ func (s *StandbyServer) clearStandbyInfo() error { if err := s.file.Truncate(0); err != nil { return err } - s.recorded = false return nil } From 71679bcf56f213c327a2e4c591380efd17d3c945 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 16 May 2014 01:00:07 -0400 Subject: [PATCH 6/7] feat(standby_server): make atomic move for file to avoid the risk of writing out a corrupted file. --- server/standby_server.go | 66 ++++++++++++---------------- tests/functional/remove_node_test.go | 2 + 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/server/standby_server.go b/server/standby_server.go index 4ff13d111..958a5f759 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" "os" @@ -41,8 +42,6 @@ type StandbyServer struct { standbyInfo joinIndex uint64 - file *os.File - removeNotify chan bool started bool closeChan chan bool @@ -57,10 +56,9 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServe client: client, standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval}, } - if err := s.openStandbyInfo(); err != nil { - return nil, fmt.Errorf("error open/create cluster info file: %v", err) + if err := s.loadInfo(); err != nil { + return nil, fmt.Errorf("error load standby info file: %v", err) } - s.loadStandbyInfo() return s, nil } @@ -95,8 +93,8 @@ func (s *StandbyServer) Stop() { close(s.closeChan) s.routineGroup.Wait() - if err := s.clearStandbyInfo(); err != nil { - log.Warnf("error clearing cluster info for standby") + if err := s.saveInfo(); err != nil { + log.Warnf("error saving cluster info for standby") } s.Running = false } @@ -228,7 +226,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error { s.setCluster(machines) s.SetSyncInterval(config.SyncInterval) - if err := s.saveStandbyInfo(); err != nil { + if err := s.saveInfo(); err != nil { log.Warnf("fail saving cluster info into disk: %v", err) } return nil @@ -286,47 +284,39 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string { return u.String() } -func (s *StandbyServer) openStandbyInfo() error { - var err error +func (s *StandbyServer) loadInfo() error { + var info standbyInfo + path := filepath.Join(s.Config.DataDir, standbyInfoName) - s.file, err = os.OpenFile(path, os.O_RDWR, 0600) + file, err := os.OpenFile(path, os.O_RDONLY, 0600) if err != nil { if os.IsNotExist(err) { - s.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) + return nil } return err } + defer file.Close() + if err = json.NewDecoder(file).Decode(&info); err != nil { + return err + } + s.standbyInfo = info return nil } -func (s *StandbyServer) loadStandbyInfo() ([]*machineMessage, error) { - if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { - return nil, err - } - if err := json.NewDecoder(s.file).Decode(&s.standbyInfo); err != nil { - return nil, err - } - return s.standbyInfo.Cluster, nil -} - -func (s *StandbyServer) saveStandbyInfo() error { - if err := s.clearStandbyInfo(); err != nil { - return nil - } - if err := json.NewEncoder(s.file).Encode(s.standbyInfo); err != nil { +func (s *StandbyServer) saveInfo() error { + tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName) + if err != nil { return err } - if err := s.file.Sync(); err != nil { - return err - } - return nil -} - -func (s *StandbyServer) clearStandbyInfo() error { - if _, err := s.file.Seek(0, os.SEEK_SET); err != nil { - return err - } - if err := s.file.Truncate(0); err != nil { + if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil { + tmpFile.Close() + os.Remove(tmpFile.Name()) + return err + } + tmpFile.Close() + + path := filepath.Join(s.Config.DataDir, standbyInfoName) + if err = os.Rename(tmpFile.Name(), path); err != nil { return err } return nil diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 954700a15..67e8e455b 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -105,6 +105,8 @@ func TestRemoveNode(t *testing.T) { client.Do(rmReq) + time.Sleep(100 * time.Millisecond) + resp, err := c.Get("_etcd/machines", false, false) if err != nil { From 84f71b6c8715490b18cd1849090e170861038dbd Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 16 May 2014 18:07:49 -0400 Subject: [PATCH 7/7] chore(standby_server): remove error return because standby server should be started in best efforts. --- etcd/etcd.go | 4 +--- server/standby_server.go | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 7c708de10..3e6ce7ed0 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -231,9 +231,7 @@ func (e *Etcd) Run() { ClientURL: e.Config.Addr, DataDir: e.Config.DataDir, } - if e.StandbyServer, err = server.NewStandbyServer(ssConfig, client); err != nil { - log.Fatal("error new standby server:", err) - } + e.StandbyServer = server.NewStandbyServer(ssConfig, client) // Generating config could be slow. // Put it here to make listen happen immediately after peer-server starting. diff --git a/server/standby_server.go b/server/standby_server.go index 958a5f759..401755e37 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -50,16 +50,16 @@ type StandbyServer struct { sync.Mutex } -func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) { +func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer { s := &StandbyServer{ Config: config, client: client, standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval}, } if err := s.loadInfo(); err != nil { - return nil, fmt.Errorf("error load standby info file: %v", err) + log.Warnf("error load standby info file: %v", err) } - return s, nil + return s } func (s *StandbyServer) Start() {