From fe53ffd74d97576c49c3a959d19ccf996141675e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 31 Dec 2014 21:10:50 -0800 Subject: [PATCH] rafthttp: streamserver -> streamwriter --- rafthttp/http.go | 6 +++--- rafthttp/peer.go | 6 +++--- rafthttp/streamer.go | 49 ++++++++++++++++++++++---------------------- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/rafthttp/http.go b/rafthttp/http.go index 213107129..1a65028e1 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -159,14 +159,14 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - stream := newStreamServer(w.(WriteFlusher), from, term) - err = p.attachStream(stream) + sw := newStreamWriter(w.(WriteFlusher), from, term) + err = p.attachStream(sw) if err != nil { log.Printf("rafthttp: %v", err) http.Error(w, err.Error(), http.StatusBadRequest) return } - <-stream.stopNotify() + <-sw.stopNotify() } type writerToResponse interface { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5493371f9..3146cef82 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -246,9 +246,9 @@ func (p *peer) post(data []byte) error { } // attachStream attaches a streamSever to the peer. -func (p *peer) attachStream(server *streamServer) error { - server.fs = p.fs - return p.stream.attach(server) +func (p *peer) attachStream(sw *streamWriter) error { + sw.fs = p.fs + return p.stream.attach(sw) } // Pause pauses the peer. The peer will simply drops all incoming diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index 24866eb95..52ac6b7a5 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -44,7 +44,7 @@ type stream struct { // the server might be attached asynchronously with the owner of the stream // use a mutex to protect it sync.Mutex - server *streamServer + w *streamWriter client *streamClient } @@ -63,38 +63,38 @@ func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, return nil } -func (s *stream) attach(server *streamServer) error { +func (s *stream) attach(sw *streamWriter) error { s.Lock() defer s.Unlock() - if s.server != nil { + if s.w != nil { // ignore lower-term streaming request - if server.term < s.server.term { - return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term) + if sw.term < s.w.term { + return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term) } - s.server.stop() + s.w.stop() } - s.server = server + s.w = sw return nil } func (s *stream) write(m raftpb.Message) bool { s.Lock() defer s.Unlock() - if s.server == nil { + if s.w == nil { return false } - if m.Term != s.server.term { - if m.Term > s.server.term { + if m.Term != s.w.term { + if m.Term > s.w.term { panic("expected server to be invalidated when there is a higher term message") } return false } // todo: early unlock? - if err := s.server.send(m.Entries); err != nil { + if err := s.w.send(m.Entries); err != nil { log.Printf("stream: error sending message: %v", err) log.Printf("stream: stopping the stream server...") - s.server.stop() - s.server = nil + s.w.stop() + s.w = nil return false } return true @@ -106,10 +106,10 @@ func (s *stream) invalidate(term uint64) { s.Lock() defer s.Unlock() - if s.server != nil { - if s.server.term < term { - s.server.stop() - s.server = nil + if s.w != nil { + if s.w.term < term { + s.w.stop() + s.w = nil } } if s.client != nil { @@ -136,9 +136,8 @@ type WriteFlusher interface { http.Flusher } -// TODO: rename it to streamWriter. // TODO: replace fs with stream stats -type streamServer struct { +type streamWriter struct { to types.ID term uint64 fs *stats.FollowerStats @@ -148,8 +147,8 @@ type streamServer struct { // newStreamServer starts and returns a new started stream server. // The caller should call stop when finished, to shut it down. -func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer { - s := &streamServer{ +func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter { + s := &streamWriter{ to: to, term: term, q: make(chan []raftpb.Entry, streamBufSize), @@ -159,7 +158,7 @@ func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer { return s } -func (s *streamServer) send(ents []raftpb.Entry) error { +func (s *streamWriter) send(ents []raftpb.Entry) error { select { case <-s.done: return fmt.Errorf("stopped") @@ -174,7 +173,7 @@ func (s *streamServer) send(ents []raftpb.Entry) error { } } -func (s *streamServer) handle(w WriteFlusher) { +func (s *streamWriter) handle(w WriteFlusher) { defer func() { close(s.done) log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term) @@ -192,12 +191,12 @@ func (s *streamServer) handle(w WriteFlusher) { } } -func (s *streamServer) stop() { +func (s *streamWriter) stop() { close(s.q) <-s.done } -func (s *streamServer) stopNotify() <-chan struct{} { return s.done } +func (s *streamWriter) stopNotify() <-chan struct{} { return s.done } // TODO: rename it to streamReader. // TODO: move the raft interface out of the reader.