diff --git a/etcd/etcd.go b/etcd/etcd.go index 4455bd741..5da3a3473 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -39,10 +39,9 @@ type Server struct { raftPubAddr string tickDuration time.Duration - mode atomicInt - nodes map[string]bool - p *participant - s *standby + mode atomicInt + p *participant + s *standby client *v2client peerHub *peerHub @@ -79,17 +78,13 @@ func New(c *config.Config, id int64) *Server { raftPubAddr: c.Peer.Addr, tickDuration: defaultTickDuration, - mode: atomicInt(stopMode), - nodes: make(map[string]bool), + mode: atomicInt(stopMode), client: newClient(tc), peerHub: newPeerHub(c.Peers, client), stopc: make(chan struct{}), } - for _, seed := range c.Peers { - s.nodes[seed] = true - } return s } @@ -155,7 +150,7 @@ func (s *Server) Run() { s.mu.Unlock() next = s.p.run() case standbyMode: - s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub) + s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub) s.mode.Set(standbyMode) s.mu.Unlock() next = s.s.run() diff --git a/etcd/standby.go b/etcd/standby.go index 21d877f4a..09bd23917 100644 --- a/etcd/standby.go +++ b/etcd/standby.go @@ -40,8 +40,6 @@ type standby struct { client *v2client peerHub *peerHub - nodes map[string]bool - leader int64 leaderAddr string mu sync.RWMutex @@ -52,7 +50,7 @@ type standby struct { *http.ServeMux } -func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby { +func newStandby(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub) *standby { s := &standby{ id: id, pubAddr: pubAddr, @@ -61,8 +59,6 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b client: client, peerHub: peerHub, - nodes: nodes, - leader: noneId, leaderAddr: "", clusterConf: config.NewClusterConfig(), @@ -77,6 +73,7 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b func (s *standby) run() int64 { var syncDuration time.Duration + nodes := s.peerHub.getSeeds() for { select { case <-time.After(syncDuration): @@ -85,12 +82,14 @@ func (s *standby) run() int64 { return stopMode } - if err := s.syncCluster(); err != nil { + if update, err := s.syncCluster(nodes); err != nil { log.Println("standby sync:", err) continue + } else { + nodes = update } syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second)) - if s.clusterConf.ActiveSize <= len(s.nodes) { + if s.clusterConf.ActiveSize <= len(nodes) { continue } if err := s.joinByAddr(s.leaderAddr); err != nil { @@ -130,8 +129,8 @@ func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error { return nil } -func (s *standby) syncCluster() error { - for node := range s.nodes { +func (s *standby) syncCluster(nodes map[string]bool) (map[string]bool, error) { + for node := range nodes { machines, err := s.client.GetMachines(node) if err != nil { continue @@ -140,21 +139,21 @@ func (s *standby) syncCluster() error { if err != nil { continue } - s.nodes = make(map[string]bool) + nn := make(map[string]bool) for _, machine := range machines { - s.nodes[machine.PeerURL] = true + nn[machine.PeerURL] = true if machine.State == stateLeader { id, err := strconv.ParseInt(machine.Name, 0, 64) if err != nil { - return err + return nil, err } s.setLeaderInfo(id, machine.PeerURL) } } s.clusterConf = config - return nil + return nn, nil } - return fmt.Errorf("unreachable cluster") + return nil, fmt.Errorf("unreachable cluster") } func (s *standby) joinByAddr(addr string) error {