diff --git a/rafthttp/http.go b/rafthttp/http.go index ad2982d95..b6905757b 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -153,7 +153,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - sw := newStreamWriter(from, term) + sw := newStreamWriter(w.(WriteFlusher), from, term) err = p.attachStream(sw) if err != nil { log.Printf("rafthttp: %v", err) @@ -163,7 +163,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - go sw.handle(w.(WriteFlusher)) <-sw.stopNotify() } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index e2bbd3a60..3ecf0b05f 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,7 +15,8 @@ package rafthttp import ( - "errors" + "fmt" + "log" "net/http" "sync" "time" @@ -51,12 +52,17 @@ type peer struct { pipeline *pipeline stream *stream - paused bool - stopped bool + sendc chan raftpb.Message + updatec chan string + attachc chan *streamWriter + pausec chan struct{} + resumec chan struct{} + stopc chan struct{} + done chan struct{} } func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { - return &peer{ + p := &peer{ id: id, cid: cid, tr: tr, @@ -67,33 +73,111 @@ func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, stream: &stream{}, batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), + + sendc: make(chan raftpb.Message), + updatec: make(chan string), + attachc: make(chan *streamWriter), + pausec: make(chan struct{}), + resumec: make(chan struct{}), + stopc: make(chan struct{}), + done: make(chan struct{}), + } + go p.run() + return p +} + +func (p *peer) run() { + var paused bool + // non-blocking main loop + for { + select { + case m := <-p.sendc: + if paused { + continue + } + p.send(m) + case u := <-p.updatec: + p.u = u + p.pipeline.update(u) + case sw := <-p.attachc: + sw.fs = p.fs + if err := p.stream.attach(sw); err != nil { + sw.stop() + continue + } + go sw.handle() + case <-p.pausec: + paused = true + case <-p.resumec: + paused = false + case <-p.stopc: + p.pipeline.stop() + p.stream.stop() + close(p.done) + return + } + } +} + +func (p *peer) Send(m raftpb.Message) { + select { + case p.sendc <- m: + case <-p.done: + log.Panicf("peer: unexpected stopped") } } func (p *peer) Update(u string) { - p.Lock() - defer p.Unlock() - if p.stopped { - // TODO: not panic here? - panic("peer: update a stopped peer") + select { + case p.updatec <- u: + case <-p.done: + log.Panicf("peer: unexpected stopped") } - p.u = u - p.pipeline.update(u) } -// Send sends the data to the remote node. It is always non-blocking. +// attachStream attaches a streamWriter to the peer. +// If attach succeeds, peer will take charge of the given streamWriter. +func (p *peer) attachStream(sw *streamWriter) error { + select { + case p.attachc <- sw: + return nil + case <-p.done: + return fmt.Errorf("peer: stopped") + } +} + +// Pause pauses the peer. The peer will simply drops all incoming +// messages without retruning an error. +func (p *peer) Pause() { + select { + case p.pausec <- struct{}{}: + case <-p.done: + } +} + +// Resume resumes a paused peer. +func (p *peer) Resume() { + select { + case p.resumec <- struct{}{}: + case <-p.done: + } +} + +// Stop performs any necessary finalization and terminates the peer +// elegantly. +func (p *peer) Stop() { + select { + case p.stopc <- struct{}{}: + case <-p.done: + return + } + <-p.done +} + +// send sends the data to the remote node. It is always non-blocking. // It may be fail to send data if it returns nil error. // TODO (xiangli): reasonable retry logic -func (p *peer) Send(m raftpb.Message) error { - p.Lock() - defer p.Unlock() - if p.stopped { - return errors.New("peer: stopped") - } - if p.paused { - return nil - } - +func (p *peer) send(m raftpb.Message) error { // move all the stream related stuff into stream p.stream.invalidate(m.Term) if shouldInitStream(m) && !p.stream.isOpen() { @@ -132,41 +216,4 @@ func (p *peer) Send(m raftpb.Message) error { return err } -// Stop performs any necessary finalization and terminates the peer -// elegantly. -func (p *peer) Stop() { - p.Lock() - defer p.Unlock() - p.pipeline.stop() - p.stream.stop() - p.stopped = true -} - -// attachStream attaches a streamSever to the peer. -func (p *peer) attachStream(sw *streamWriter) error { - p.Lock() - defer p.Unlock() - if p.stopped { - return errors.New("peer: stopped") - } - - sw.fs = p.fs - return p.stream.attach(sw) -} - -// Pause pauses the peer. The peer will simply drops all incoming -// messages without retruning an error. -func (p *peer) Pause() { - p.Lock() - defer p.Unlock() - p.paused = true -} - -// Resume resumes a paused peer. -func (p *peer) Resume() { - p.Lock() - defer p.Unlock() - p.paused = false -} - func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp } diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index a77e50559..20d8adcd9 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -150,6 +150,7 @@ type WriteFlusher interface { // TODO: replace fs with stream stats type streamWriter struct { + w WriteFlusher to types.ID term uint64 fs *stats.FollowerStats @@ -159,8 +160,9 @@ type streamWriter struct { // newStreamWriter starts and returns a new unstarted stream writer. // The caller should call stop when finished, to shut it down. -func newStreamWriter(to types.ID, term uint64) *streamWriter { +func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter { s := &streamWriter{ + w: w, to: to, term: term, q: make(chan []raftpb.Entry, streamBufSize), @@ -184,13 +186,13 @@ func (s *streamWriter) send(ents []raftpb.Entry) error { } } -func (s *streamWriter) handle(w WriteFlusher) { +func (s *streamWriter) handle() { defer func() { close(s.done) log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term) }() - ew := newEntryWriter(w, s.to) + ew := newEntryWriter(s.w, s.to) for ents := range s.q { // Considering Commit in MsgApp is not recovered when received, // zero-entry appendEntry messages have no use to raft state machine. @@ -203,7 +205,7 @@ func (s *streamWriter) handle(w WriteFlusher) { log.Printf("rafthttp: encountered error writing to server log stream: %v", err) return } - w.Flush() + s.w.Flush() s.fs.Succ(time.Since(start)) } }