diff --git a/rafthttp/http.go b/rafthttp/http.go index 1a65028e1..60afe83a8 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -156,16 +156,17 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - w.WriteHeader(http.StatusOK) - w.(http.Flusher).Flush() - - sw := newStreamWriter(w.(WriteFlusher), from, term) + sw := newStreamWriter(from, term) err = p.attachStream(sw) if err != nil { log.Printf("rafthttp: %v", err) http.Error(w, err.Error(), http.StatusBadRequest) return } + + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + go sw.handle(w.(WriteFlusher)) <-sw.stopNotify() } diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index 942e94521..be37af224 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -162,14 +162,13 @@ type streamWriter struct { // newStreamServer starts and returns a new started stream server. // The caller should call stop when finished, to shut it down. -func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter { +func newStreamWriter(to types.ID, term uint64) *streamWriter { s := &streamWriter{ to: to, term: term, q: make(chan []raftpb.Entry, streamBufSize), done: make(chan struct{}), } - go s.handle(w) return s }