From ef98f6051c835e2d08822e0b52ed5d06193c379a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 4 Apr 2014 17:10:18 -0700 Subject: [PATCH] bump(goraft/raft): 585c58026c --- .../goraft/raft/http_transporter.go | 16 ++++ third_party/github.com/goraft/raft/peer.go | 7 +- third_party/github.com/goraft/raft/server.go | 74 +++++++++++++------ 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/third_party/github.com/goraft/raft/http_transporter.go b/third_party/github.com/goraft/raft/http_transporter.go index 183254b23..b3d4bb746 100644 --- a/third_party/github.com/goraft/raft/http_transporter.go +++ b/third_party/github.com/goraft/raft/http_transporter.go @@ -244,6 +244,10 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc { } resp := server.AppendEntries(req) + if resp == nil { + http.Error(w, "Failed creating response.", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -263,6 +267,10 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc { } resp := server.RequestVote(req) + if resp == nil { + http.Error(w, "Failed creating response.", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -282,6 +290,10 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc { } resp := server.RequestSnapshot(req) + if resp == nil { + http.Error(w, "Failed creating response.", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -301,6 +313,10 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun } resp := server.SnapshotRecoveryRequest(req) + if resp == nil { + http.Error(w, "Failed creating response.", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return diff --git a/third_party/github.com/goraft/raft/peer.go b/third_party/github.com/goraft/raft/peer.go index 28c25c48d..df9e4b0c6 100644 --- a/third_party/github.com/goraft/raft/peer.go +++ b/third_party/github.com/goraft/raft/peer.go @@ -88,7 +88,12 @@ func (p *Peer) setLastActivity(now time.Time) { func (p *Peer) startHeartbeat() { p.stopChan = make(chan bool) c := make(chan bool) - go p.heartbeat(c) + + p.server.routineGroup.Add(1) + go func() { + defer p.server.routineGroup.Done() + p.heartbeat(c) + }() <-c } diff --git a/third_party/github.com/goraft/raft/server.go b/third_party/github.com/goraft/raft/server.go index 5f29010af..d020c9070 100644 --- a/third_party/github.com/goraft/raft/server.go +++ b/third_party/github.com/goraft/raft/server.go @@ -55,6 +55,7 @@ const ElectionTimeoutThresholdPercent = 0.8 var NotLeaderError = errors.New("raft.Server: Not current leader") var DuplicatePeerError = errors.New("raft.Server: Duplicate peer") var CommandTimeoutError = errors.New("raft: Command timeout") +var StopError = errors.New("raft: Has been stopped") //------------------------------------------------------------------------------ // @@ -123,7 +124,7 @@ type server struct { mutex sync.RWMutex syncedPeer map[string]bool - stopped chan chan bool + stopped chan bool c chan *ev electionTimeout time.Duration heartbeatInterval time.Duration @@ -140,6 +141,8 @@ type server struct { maxLogEntriesPerRequest uint64 connectionString string + + routineGroup sync.WaitGroup } // An internal event to be processed by the server's event loop. @@ -177,7 +180,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S state: Stopped, peers: make(map[string]*Peer), log: newLog(), - stopped: make(chan chan bool), c: make(chan *ev, 256), electionTimeout: DefaultElectionTimeout, heartbeatInterval: DefaultHeartbeatInterval, @@ -440,6 +442,9 @@ func (s *server) Start() error { return err } + // stopped needs to be allocated each time server starts + // because it is closed at `Stop`. + s.stopped = make(chan bool) s.setState(Follower) // If no log entries exist then @@ -457,7 +462,11 @@ func (s *server) Start() error { debugln(s.GetState()) - go s.loop() + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.loop() + }() return nil } @@ -507,11 +516,11 @@ func (s *server) Stop() { return } - stop := make(chan bool) - s.stopped <- stop + close(s.stopped) + + // make sure all goroutines have stopped before we close the log + s.routineGroup.Wait() - // make sure the server has stopped before we close the log - <-stop s.log.close() s.setState(Stopped) } @@ -605,9 +614,17 @@ func (s *server) loop() { // until the event is actually processed before returning. func (s *server) send(value interface{}) (interface{}, error) { event := &ev{target: value, c: make(chan error, 1)} - s.c <- event - err := <-event.c - return event.returnValue, err + select { + case s.c <- event: + case <-s.stopped: + return nil, StopError + } + select { + case <-s.stopped: + return nil, StopError + case err := <-event.c: + return event.returnValue, err + } } func (s *server) sendAsync(value interface{}) { @@ -621,8 +638,13 @@ func (s *server) sendAsync(value interface{}) { default: } + s.routineGroup.Add(1) go func() { - s.c <- event + defer s.routineGroup.Done() + select { + case s.c <- event: + case <-s.stopped: + } }() } @@ -640,9 +662,8 @@ func (s *server) followerLoop() { var err error update := false select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -717,7 +738,11 @@ func (s *server) candidateLoop() { // Send RequestVote RPCs to all other servers. respChan = make(chan *RequestVoteResponse, len(s.peers)) for _, peer := range s.peers { - go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan) + s.routineGroup.Add(1) + go func(peer *Peer) { + defer s.routineGroup.Done() + peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan) + }(peer) } // Wait for either: @@ -740,9 +765,8 @@ func (s *server) candidateLoop() { // Collect votes from peers. select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case resp := <-respChan: @@ -786,19 +810,22 @@ func (s *server) leaderLoop() { // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to // each server; repeat during idle periods to prevent election timeouts // (ยง5.2)". The heartbeats started above do the "idle" period work. - go s.Do(NOPCommand{}) + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.Do(NOPCommand{}) + }() // Begin to collect response from followers for s.State() == Leader { var err error select { - case stop := <-s.stopped: + case <-s.stopped: // Stop all peers before stop for _, peer := range s.peers { peer.stopHeartbeat(false) } s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -826,9 +853,8 @@ func (s *server) snapshotLoop() { for s.State() == Snapshotting { var err error select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -1109,7 +1135,11 @@ func (s *server) RemovePeer(name string) error { // So we might be holding log lock and waiting for log lock, // which lead to a deadlock. // TODO(xiangli) refactor log lock - go peer.stopHeartbeat(true) + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + peer.stopHeartbeat(true) + }() } delete(s.peers, name)