diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index 875002dad..e3e8c02df 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error { // ignore lower-term streaming request if sw.term < s.w.term { return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term) + } else if sw.term == s.w.term { + s.w.stopWithoutLog() + } else { + s.w.stop() } - s.w.stop() } s.w = sw return nil @@ -151,21 +154,23 @@ type WriteFlusher interface { // TODO: replace fs with stream stats type streamWriter struct { - to types.ID - term uint64 - fs *stats.FollowerStats - q chan []raftpb.Entry - done chan struct{} + to types.ID + term uint64 + fs *stats.FollowerStats + q chan []raftpb.Entry + done chan struct{} + printLog bool } // newStreamWriter starts and returns a new unstarted stream writer. // The caller should call stop when finished, to shut it down. func newStreamWriter(to types.ID, term uint64) *streamWriter { s := &streamWriter{ - to: to, - term: term, - q: make(chan []raftpb.Entry, streamBufSize), - done: make(chan struct{}), + to: to, + term: term, + q: make(chan []raftpb.Entry, streamBufSize), + done: make(chan struct{}), + printLog: true, } return s } @@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error { func (s *streamWriter) handle(w WriteFlusher) { defer func() { close(s.done) - log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term) + if s.printLog { + log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term) + } }() ew := newEntryWriter(w, s.to) @@ -215,6 +222,11 @@ func (s *streamWriter) stop() { <-s.done } +func (s *streamWriter) stopWithoutLog() { + s.printLog = false + s.stop() +} + func (s *streamWriter) stopNotify() <-chan struct{} { return s.done } // TODO: move the raft interface out of the reader.