server: simplify mode transition

This commit is contained in:
Yicheng Qin 2014-07-16 17:32:00 -07:00
parent 18001dd779
commit 5f9a5e6a5d

View File

@ -63,10 +63,11 @@ type Server struct {
nodes map[string]bool nodes map[string]bool
client *v2client client *v2client
t *transporter t *transporter
node *v2Raft
store.Store
// participant mode vars // participant mode vars
proposal chan v2Proposal proposal chan v2Proposal
node *v2Raft
addNodeC chan raft.Config addNodeC chan raft.Config
removeNodeC chan raft.Config removeNodeC chan raft.Config
@ -75,8 +76,6 @@ type Server struct {
leaderAddr string leaderAddr string
clusterConf *config.ClusterConfig clusterConf *config.ClusterConfig
store.Store
modeC chan int modeC chan int
stop chan struct{} stop chan struct{}
@ -105,21 +104,20 @@ func New(c *config.Config, id int64) *Server {
id: id, id: id,
pubAddr: c.Addr, pubAddr: c.Addr,
raftPubAddr: c.Peer.Addr, raftPubAddr: c.Peer.Addr,
tickDuration: defaultTickDuration,
nodes: make(map[string]bool), nodes: make(map[string]bool),
client: newClient(tc), client: newClient(tc),
t: newTransporter(tc), t: newTransporter(tc),
tickDuration: defaultTickDuration, node: &v2Raft{
Node: raft.New(id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
},
Store: store.New(), Store: store.New(),
modeC: make(chan int, 10), modeC: make(chan int, 10),
stop: make(chan struct{}), stop: make(chan struct{}),
} }
node := &v2Raft{
Node: raft.New(id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
}
s.initParticipant(node)
for _, seed := range c.Peers { for _, seed := range c.Peers {
s.nodes[seed] = true s.nodes[seed] = true
@ -306,18 +304,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func (s *Server) initParticipant(node *v2Raft) { func (s *Server) initParticipant() {
s.proposal = make(chan v2Proposal, maxBufferedProposal) s.proposal = make(chan v2Proposal, maxBufferedProposal)
s.node = node
s.addNodeC = make(chan raft.Config, 1) s.addNodeC = make(chan raft.Config, 1)
s.removeNodeC = make(chan raft.Config, 1) s.removeNodeC = make(chan raft.Config, 1)
s.t.start() s.t.start()
} }
func (s *Server) initStandby(leader int64, leaderAddr string, conf *config.ClusterConfig) { func (s *Server) initStandby() {
s.leader = leader s.leader = noneId
s.leaderAddr = leaderAddr s.leaderAddr = ""
s.clusterConf = conf s.clusterConf = config.NewClusterConfig()
} }
func (s *Server) run() { func (s *Server) run() {
@ -329,8 +326,10 @@ func (s *Server) run() {
switch s.mode { switch s.mode {
case participant: case participant:
s.initParticipant()
s.runParticipant() s.runParticipant()
case standby: case standby:
s.initStandby()
s.runStandby() s.runStandby()
case stop: case stop:
return return
@ -387,20 +386,15 @@ func (s *Server) runParticipant() {
} }
log.Printf("Node: %d removed to standby mode\n", s.id) log.Printf("Node: %d removed to standby mode\n", s.id)
leader := noneId
leaderAddr := ""
if s.node.HasLeader() && !s.node.IsLeader() {
leader = s.node.Leader()
leaderAddr = s.fetchAddrFromStore(s.leader)
}
conf := s.ClusterConfig()
s.initStandby(leader, leaderAddr, conf)
s.mode = standby s.mode = standby
return return
} }
func (s *Server) runStandby() { func (s *Server) runStandby() {
syncDuration := time.Duration(int64(s.clusterConf.SyncInterval * float64(time.Second))) syncDuration := time.Duration(int64(s.clusterConf.SyncInterval * float64(time.Second)))
if err := s.syncCluster(); err != nil {
log.Println("standby sync:", err)
}
for { for {
select { select {
case <-time.After(syncDuration): case <-time.After(syncDuration):
@ -410,9 +404,14 @@ func (s *Server) runStandby() {
} }
if err := s.syncCluster(); err != nil { if err := s.syncCluster(); err != nil {
log.Println("standby sync:", err)
continue continue
} }
if err := s.standbyJoin(s.leaderAddr); err != nil { if s.clusterConf.ActiveSize <= len(s.nodes) {
continue
}
if err := s.joinByPeer(s.leaderAddr); err != nil {
log.Println("standby join:", err)
continue continue
} }
break break
@ -422,12 +421,11 @@ func (s *Server) runStandby() {
// TODO(yichengq): use old v2Raft // TODO(yichengq): use old v2Raft
// 1. reject proposal in leader state when sm is removed // 1. reject proposal in leader state when sm is removed
// 2. record removeIndex in node to ignore msgDenial and old removal // 2. record removeIndex in node to ignore msgDenial and old removal
s.Store = store.New() s.node = &v2Raft{
node := &v2Raft{
Node: raft.New(s.id, defaultHeartbeat, defaultElection), Node: raft.New(s.id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}), result: make(map[wait]chan interface{}),
} }
s.initParticipant(node) s.Store = store.New()
s.mode = participant s.mode = participant
return return
} }
@ -530,10 +528,7 @@ func (s *Server) fetchAddrFromStore(nodeId int64) string {
return "" return ""
} }
func (s *Server) standbyJoin(addr string) error { func (s *Server) joinByPeer(addr string) error {
if s.clusterConf.ActiveSize <= len(s.nodes) {
return fmt.Errorf("full cluster")
}
info := &context{ info := &context{
MinVersion: store.MinVersion(), MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(), MaxVersion: store.MaxVersion(),