From 4c324fe3a455ca502f2f3e485aaac0d95a442b93 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 18 Jul 2014 08:38:49 -0700 Subject: [PATCH] etcd: cleanup etcd.go --- etcd/etcd.go | 57 ++++++++++++++++----------------------------- etcd/participant.go | 6 ++--- etcd/standby.go | 6 ++--- 3 files changed, 26 insertions(+), 43 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 14f543b59..6a6f29980 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -93,17 +93,22 @@ func (s *Server) Stop() { if s.mode.Get() == stopMode { return } - s.stopc <- struct{}{} + m := s.mode.Get() + s.mode.Set(stopMode) + switch m { + case participantMode: + s.p.stop() + case standbyMode: + s.s.stop() + } <-s.stopc } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch s.mode.Get() { - case participantMode: + case participantMode, standbyMode: s.p.ServeHTTP(w, r) - case standbyMode: - s.s.ServeHTTP(w, r) - case stopMode: + default: http.NotFound(w, r) } } @@ -116,56 +121,34 @@ func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) { switch s.mode.Get() { case participantMode: s.p.raftHandler().ServeHTTP(w, r) - case standbyMode: - http.NotFound(w, r) - case stopMode: + default: http.NotFound(w, r) } } func (s *Server) Run() { - runc := make(chan struct{}) next := participantMode for { switch next { case participantMode: s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration) s.mode.Set(participantMode) - // TODO: it may block here. remove modeC later. s.modeC <- s.mode.Get() - next = standbyMode - go func() { - s.p.run() - runc <- struct{}{} - }() + // TODO: it may block here. move modeC later. + next = s.p.run() case standbyMode: s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub) s.mode.Set(standbyMode) s.modeC <- s.mode.Get() - next = participantMode - go func() { - s.s.run() - runc <- struct{}{} - }() + next = s.s.run() + case stopMode: + s.client.CloseConnections() + s.peerHub.stop() + s.modeC <- s.mode.Get() + s.stopc <- struct{}{} + return default: panic("unsupport mode") } - select { - case <-runc: - case <-s.stopc: - switch s.mode.Get() { - case participantMode: - s.p.stop() - case standbyMode: - s.s.stop() - } - <-runc - s.mode.Set(stopMode) - s.modeC <- s.mode.Get() - s.client.CloseConnections() - s.peerHub.stop() - s.stopc <- struct{}{} - return - } } } diff --git a/etcd/participant.go b/etcd/participant.go index d71ded93a..dbda754b7 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -98,7 +98,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, seeds map[stri return p } -func (p *participant) run() { +func (p *participant) run() int64 { if len(p.seeds) == 0 { log.Println("starting a bootstrap node") p.node.Campaign() @@ -146,13 +146,13 @@ func (p *participant) run() { node.Sync() case <-p.stopc: log.Printf("Participant %d stopped\n", p.id) - return + return stopMode } p.apply(node.Next()) p.send(node.Msgs()) if node.IsRemoved() { log.Printf("Participant %d return\n", p.id) - return + return standbyMode } } } diff --git a/etcd/standby.go b/etcd/standby.go index 579adb3ec..130dc6089 100644 --- a/etcd/standby.go +++ b/etcd/standby.go @@ -53,14 +53,14 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b return s } -func (s *standby) run() { +func (s *standby) run() int64 { var syncDuration time.Duration for { select { case <-time.After(syncDuration): case <-s.stopc: log.Printf("Standby %d stopped\n", s.id) - return + return stopMode } if err := s.syncCluster(); err != nil { @@ -75,7 +75,7 @@ func (s *standby) run() { log.Println("standby join:", err) continue } - return + return participantMode } }