etcd: remove nodes in standby

This commit is contained in:
Xiang Li
2014-07-19 09:32:18 -07:00
committed by Yicheng Qin
parent b4cf146a52
commit daa49023cf
2 changed files with 18 additions and 24 deletions

View File

@@ -39,10 +39,9 @@ type Server struct {
raftPubAddr string
tickDuration time.Duration
mode atomicInt
nodes map[string]bool
p *participant
s *standby
mode atomicInt
p *participant
s *standby
client *v2client
peerHub *peerHub
@@ -79,17 +78,13 @@ func New(c *config.Config, id int64) *Server {
raftPubAddr: c.Peer.Addr,
tickDuration: defaultTickDuration,
mode: atomicInt(stopMode),
nodes: make(map[string]bool),
mode: atomicInt(stopMode),
client: newClient(tc),
peerHub: newPeerHub(c.Peers, client),
stopc: make(chan struct{}),
}
for _, seed := range c.Peers {
s.nodes[seed] = true
}
return s
}
@@ -155,7 +150,7 @@ func (s *Server) Run() {
s.mu.Unlock()
next = s.p.run()
case standbyMode:
s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub)
s.mode.Set(standbyMode)
s.mu.Unlock()
next = s.s.run()

View File

@@ -40,8 +40,6 @@ type standby struct {
client *v2client
peerHub *peerHub
nodes map[string]bool
leader int64
leaderAddr string
mu sync.RWMutex
@@ -52,7 +50,7 @@ type standby struct {
*http.ServeMux
}
func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby {
func newStandby(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub) *standby {
s := &standby{
id: id,
pubAddr: pubAddr,
@@ -61,8 +59,6 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b
client: client,
peerHub: peerHub,
nodes: nodes,
leader: noneId,
leaderAddr: "",
clusterConf: config.NewClusterConfig(),
@@ -77,6 +73,7 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b
func (s *standby) run() int64 {
var syncDuration time.Duration
nodes := s.peerHub.getSeeds()
for {
select {
case <-time.After(syncDuration):
@@ -85,12 +82,14 @@ func (s *standby) run() int64 {
return stopMode
}
if err := s.syncCluster(); err != nil {
if update, err := s.syncCluster(nodes); err != nil {
log.Println("standby sync:", err)
continue
} else {
nodes = update
}
syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second))
if s.clusterConf.ActiveSize <= len(s.nodes) {
if s.clusterConf.ActiveSize <= len(nodes) {
continue
}
if err := s.joinByAddr(s.leaderAddr); err != nil {
@@ -130,8 +129,8 @@ func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error {
return nil
}
func (s *standby) syncCluster() error {
for node := range s.nodes {
func (s *standby) syncCluster(nodes map[string]bool) (map[string]bool, error) {
for node := range nodes {
machines, err := s.client.GetMachines(node)
if err != nil {
continue
@@ -140,21 +139,21 @@ func (s *standby) syncCluster() error {
if err != nil {
continue
}
s.nodes = make(map[string]bool)
nn := make(map[string]bool)
for _, machine := range machines {
s.nodes[machine.PeerURL] = true
nn[machine.PeerURL] = true
if machine.State == stateLeader {
id, err := strconv.ParseInt(machine.Name, 0, 64)
if err != nil {
return err
return nil, err
}
s.setLeaderInfo(id, machine.PeerURL)
}
}
s.clusterConf = config
return nil
return nn, nil
}
return fmt.Errorf("unreachable cluster")
return nil, fmt.Errorf("unreachable cluster")
}
func (s *standby) joinByAddr(addr string) error {