diff --git a/etcd/etcd.go b/etcd/etcd.go index ef8f02489..374af45b1 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -63,10 +63,11 @@ type Server struct { nodes map[string]bool client *v2client t *transporter + node *v2Raft + store.Store // participant mode vars proposal chan v2Proposal - node *v2Raft addNodeC chan raft.Config removeNodeC chan raft.Config @@ -75,8 +76,6 @@ type Server struct { leaderAddr string clusterConf *config.ClusterConfig - store.Store - modeC chan int stop chan struct{} @@ -105,21 +104,20 @@ func New(c *config.Config, id int64) *Server { id: id, pubAddr: c.Addr, raftPubAddr: c.Peer.Addr, - nodes: make(map[string]bool), - client: newClient(tc), - t: newTransporter(tc), tickDuration: defaultTickDuration, + nodes: make(map[string]bool), + client: newClient(tc), + t: newTransporter(tc), + node: &v2Raft{ + Node: raft.New(id, defaultHeartbeat, defaultElection), + result: make(map[wait]chan interface{}), + }, Store: store.New(), modeC: make(chan int, 10), stop: make(chan struct{}), } - node := &v2Raft{ - Node: raft.New(id, defaultHeartbeat, defaultElection), - result: make(map[wait]chan interface{}), - } - s.initParticipant(node) for _, seed := range c.Peers { s.nodes[seed] = true @@ -306,18 +304,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (s *Server) initParticipant(node *v2Raft) { +func (s *Server) initParticipant() { s.proposal = make(chan v2Proposal, maxBufferedProposal) - s.node = node s.addNodeC = make(chan raft.Config, 1) s.removeNodeC = make(chan raft.Config, 1) s.t.start() } -func (s *Server) initStandby(leader int64, leaderAddr string, conf *config.ClusterConfig) { - s.leader = leader - s.leaderAddr = leaderAddr - s.clusterConf = conf +func (s *Server) initStandby() { + s.leader = noneId + s.leaderAddr = "" + s.clusterConf = config.NewClusterConfig() } func (s *Server) run() { @@ -329,8 +326,10 @@ func (s *Server) run() { switch s.mode { case participant: + s.initParticipant() s.runParticipant() case standby: + s.initStandby() s.runStandby() case stop: return @@ -387,20 +386,15 @@ func (s *Server) runParticipant() { } log.Printf("Node: %d removed to standby mode\n", s.id) - leader := noneId - leaderAddr := "" - if s.node.HasLeader() && !s.node.IsLeader() { - leader = s.node.Leader() - leaderAddr = s.fetchAddrFromStore(s.leader) - } - conf := s.ClusterConfig() - s.initStandby(leader, leaderAddr, conf) s.mode = standby return } func (s *Server) runStandby() { syncDuration := time.Duration(int64(s.clusterConf.SyncInterval * float64(time.Second))) + if err := s.syncCluster(); err != nil { + log.Println("standby sync:", err) + } for { select { case <-time.After(syncDuration): @@ -410,9 +404,14 @@ func (s *Server) runStandby() { } if err := s.syncCluster(); err != nil { + log.Println("standby sync:", err) continue } - if err := s.standbyJoin(s.leaderAddr); err != nil { + if s.clusterConf.ActiveSize <= len(s.nodes) { + continue + } + if err := s.joinByPeer(s.leaderAddr); err != nil { + log.Println("standby join:", err) continue } break @@ -422,12 +421,11 @@ func (s *Server) runStandby() { // TODO(yichengq): use old v2Raft // 1. reject proposal in leader state when sm is removed // 2. record removeIndex in node to ignore msgDenial and old removal - s.Store = store.New() - node := &v2Raft{ + s.node = &v2Raft{ Node: raft.New(s.id, defaultHeartbeat, defaultElection), result: make(map[wait]chan interface{}), } - s.initParticipant(node) + s.Store = store.New() s.mode = participant return } @@ -530,10 +528,7 @@ func (s *Server) fetchAddrFromStore(nodeId int64) string { return "" } -func (s *Server) standbyJoin(addr string) error { - if s.clusterConf.ActiveSize <= len(s.nodes) { - return fmt.Errorf("full cluster") - } +func (s *Server) joinByPeer(addr string) error { info := &context{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(),