From 2193b70fb3cbba1f816afd4646097b459c8731bc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 30 Dec 2014 13:45:11 -0800 Subject: [PATCH] rafthttp: add stream --- rafthttp/http.go | 10 +-- rafthttp/peer.go | 163 ++++++++++++++----------------------------- rafthttp/streamer.go | 139 ++++++++++++++++++++++++++++++------ 3 files changed, 173 insertions(+), 139 deletions(-) diff --git a/rafthttp/http.go b/rafthttp/http.go index 78fa03bf4..213107129 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -159,14 +159,14 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - done, err := p.StartStreaming(w.(WriteFlusher), from, term) + stream := newStreamServer(w.(WriteFlusher), from, term) + err = p.attachStream(stream) if err != nil { - log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err) - // TODO: consider http status and info here - http.Error(w, "error enable streaming", http.StatusInternalServerError) + log.Printf("rafthttp: %v", err) + http.Error(w, err.Error(), http.StatusBadRequest) return } - <-done + <-stream.stopNotify() } type writerToResponse interface { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index ccfe862e3..55b377159 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -45,27 +45,6 @@ const ( ConnWriteTimeout = 5 * time.Second ) -func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer { - p := &peer{ - id: id, - active: true, - tr: tr, - u: u, - cid: cid, - r: r, - fs: fs, - shouldstop: shouldstop, - batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), - propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), - q: make(chan *raftpb.Message, senderBufSize), - } - p.wg.Add(connPerSender) - for i := 0; i < connPerSender; i++ { - go p.handle() - } - return p -} - type peer struct { id types.ID cid types.ID @@ -75,13 +54,11 @@ type peer struct { fs *stats.FollowerStats shouldstop chan struct{} - strmCln *streamClient batcher *Batcher propBatcher *ProposalBatcher q chan *raftpb.Message - strmSrvMu sync.Mutex - strmSrv *streamServer + stream *stream // wait for the handling routines wg sync.WaitGroup @@ -95,22 +72,26 @@ type peer struct { paused bool } -// StartStreaming enables streaming in the peer using the given writer, -// which provides a fast and efficient way to send appendEntry messages. -func (p *peer) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) { - p.strmSrvMu.Lock() - defer p.strmSrvMu.Unlock() - if p.strmSrv != nil { - // ignore lower-term streaming request - if term < p.strmSrv.term { - return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, p.strmSrv.term) - } - // stop the existing one - p.strmSrv.stop() - p.strmSrv = nil +func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer { + p := &peer{ + id: id, + active: true, + tr: tr, + u: u, + cid: cid, + r: r, + fs: fs, + stream: &stream{}, + shouldstop: shouldstop, + batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), + propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), + q: make(chan *raftpb.Message, senderBufSize), } - p.strmSrv = startStreamServer(w, to, term, p.fs) - return p.strmSrv.stopNotify(), nil + p.wg.Add(connPerSender) + for i := 0; i < connPerSender; i++ { + go p.handle() + } + return p } func (p *peer) Update(u string) { @@ -130,9 +111,13 @@ func (p *peer) Send(m raftpb.Message) error { return nil } - p.maybeStopStream(m.Term) - if shouldInitStream(m) && !p.hasStreamClient() { - p.initStream(types.ID(m.From), types.ID(m.To), m.Term) + // move all the stream related stuff into stream + p.stream.invalidate(m.Term) + if shouldInitStream(m) && !p.stream.isOpen() { + p.mu.Lock() + u := p.u + p.mu.Unlock() + p.stream.open(p.id, types.ID(m.To), p.cid, m.Term, p.tr, u, p.r) p.batcher.Reset(time.Now()) } @@ -140,12 +125,12 @@ func (p *peer) Send(m raftpb.Message) error { switch { case isProposal(m): p.propBatcher.Batch(m) - case canBatch(m) && p.hasStreamClient(): + case canBatch(m) && p.stream.isOpen(): if !p.batcher.ShouldBatch(time.Now()) { err = p.send(m) } case canUseStream(m): - if ok := p.tryStream(m); !ok { + if ok := p.stream.write(m); !ok { err = p.send(m) } default: @@ -183,74 +168,7 @@ func (p *peer) send(m raftpb.Message) error { func (p *peer) Stop() { close(p.q) p.wg.Wait() - p.strmSrvMu.Lock() - if p.strmSrv != nil { - p.strmSrv.stop() - p.strmSrv = nil - } - p.strmSrvMu.Unlock() - if p.strmCln != nil { - p.strmCln.stop() - } -} - -// Pause pauses the peer. The peer will simply drops all incoming -// messages without retruning an error. -func (p *peer) Pause() { - p.mu.Lock() - defer p.mu.Unlock() - p.paused = true -} - -// Resume resumes a paused peer. -func (p *peer) Resume() { - p.mu.Lock() - defer p.mu.Unlock() - p.paused = false -} - -func (p *peer) maybeStopStream(term uint64) { - if p.strmCln != nil && term > p.strmCln.term { - p.strmCln.stop() - p.strmCln = nil - } - p.strmSrvMu.Lock() - defer p.strmSrvMu.Unlock() - if p.strmSrv != nil && term > p.strmSrv.term { - p.strmSrv.stop() - p.strmSrv = nil - } -} - -func (p *peer) hasStreamClient() bool { - return p.strmCln != nil && !p.strmCln.isStopped() -} - -func (p *peer) initStream(from, to types.ID, term uint64) { - strmCln := newStreamClient(from, to, term, p.r) - p.mu.Lock() - u := p.u - p.mu.Unlock() - if err := strmCln.start(p.tr, u, p.cid); err != nil { - log.Printf("rafthttp: start stream client error: %v", err) - return - } - p.strmCln = strmCln -} - -func (p *peer) tryStream(m raftpb.Message) bool { - p.strmSrvMu.Lock() - defer p.strmSrvMu.Unlock() - if p.strmSrv == nil || m.Term != p.strmSrv.term { - return false - } - if err := p.strmSrv.send(m.Entries); err != nil { - log.Printf("rafthttp: send stream message error: %v", err) - p.strmSrv.stop() - p.strmSrv = nil - return false - } - return true + p.stream.stop() } func (p *peer) handle() { @@ -327,4 +245,25 @@ func (p *peer) post(data []byte) error { } } +// attachStream attaches a streamSever to the peer. +func (p *peer) attachStream(server *streamServer) error { + server.fs = p.fs + return p.stream.attach(server) +} + +// Pause pauses the peer. The peer will simply drops all incoming +// messages without retruning an error. +func (p *peer) Pause() { + p.mu.Lock() + defer p.mu.Unlock() + p.paused = true +} + +// Resume resumes a paused peer. +func (p *peer) Resume() { + p.mu.Lock() + defer p.mu.Unlock() + p.paused = false +} + func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp } diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index 6267f60aa..f34dd289c 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -20,10 +20,12 @@ import ( "fmt" "io" "log" + "math" "net/http" "net/url" "path" "strconv" + "sync" "time" "github.com/coreos/etcd/etcdserver/stats" @@ -37,11 +39,105 @@ const ( streamBufSize = 4096 ) +// TODO: a stream might hava one stream server or one stream client, but not both. +type stream struct { + // the server might be attached asynchronously with the owner of the stream + // use a mutex to protect it + sync.Mutex + server *streamServer + + client *streamClient +} + +func (s *stream) open(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error { + if s.client != nil { + panic("open: stream is open") + } + + c, err := newStreamClient(id, to, cid, term, tr, u, r) + if err != nil { + log.Printf("stream: error opening stream: %v", err) + return err + } + s.client = c + return nil +} + +func (s *stream) attach(server *streamServer) error { + s.Lock() + defer s.Unlock() + if s.server != nil { + // ignore lower-term streaming request + if server.term < s.server.term { + return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term) + } + s.server.stop() + } + s.server = server + return nil +} + +func (s *stream) write(m raftpb.Message) bool { + s.Lock() + defer s.Unlock() + if s.server == nil { + return false + } + if m.Term != s.server.term { + if m.Term > s.server.term { + panic("expected server to be invalidated when there is a higher term message") + } + return false + } + // todo: early unlock? + if err := s.server.send(m.Entries); err != nil { + log.Printf("stream: error sending message: %v", err) + log.Printf("stream: stopping the stream server...") + s.server.stop() + s.server = nil + return false + } + return true +} + +// invalidate stops the sever/client that is running at +// a term lower than the given term. +func (s *stream) invalidate(term uint64) { + s.Lock() + defer s.Unlock() + + if s.server != nil { + if s.server.term < term { + s.server.stop() + s.server = nil + } + } + if s.client != nil { + if s.client.term < term { + s.client.stop() + s.client = nil + } + } +} + +func (s *stream) stop() { + s.invalidate(math.MaxUint64) +} + +func (s *stream) isOpen() bool { + if s.client != nil && s.client.isStopped() { + s.client = nil + } + return s.client != nil +} + type WriteFlusher interface { io.Writer http.Flusher } +// TODO: rename it to streamWriter. +// TODO: replace fs with stream stats type streamServer struct { to types.ID term uint64 @@ -50,16 +146,16 @@ type streamServer struct { done chan struct{} } -func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer { +// newStreamServer starts and returns a new started stream server. +// The caller should call stop when finished, to shut it down. +func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer { s := &streamServer{ to: to, term: term, - fs: fs, q: make(chan []raftpb.Entry, streamBufSize), done: make(chan struct{}), } go s.handle(w) - log.Printf("rafthttp: starting server stream to %s at term %d", to, term) return s } @@ -78,13 +174,6 @@ func (s *streamServer) send(ents []raftpb.Entry) error { } } -func (s *streamServer) stop() { - close(s.q) - <-s.done -} - -func (s *streamServer) stopNotify() <-chan struct{} { return s.done } - func (s *streamServer) handle(w WriteFlusher) { defer func() { close(s.done) @@ -103,6 +192,15 @@ func (s *streamServer) handle(w WriteFlusher) { } } +func (s *streamServer) stop() { + close(s.q) + <-s.done +} + +func (s *streamServer) stopNotify() <-chan struct{} { return s.done } + +// TODO: rename it to streamReader. +// TODO: move the raft interface out of the reader. type streamClient struct { id types.ID to types.ID @@ -113,44 +211,41 @@ type streamClient struct { done chan struct{} } -func newStreamClient(id, to types.ID, term uint64, r Raft) *streamClient { - return &streamClient{ +// newStreamClient starts and returns a new started stream client. +// The caller should call stop when finished, to shut it down. +func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamClient, error) { + s := &streamClient{ id: id, to: to, term: term, r: r, done: make(chan struct{}), } -} -// Dial dials to the remote url, and sends streaming request. If it succeeds, -// it returns nil error, and the caller should call Handle function to keep -// receiving appendEntry messages. -func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error { uu, err := url.Parse(u) if err != nil { - return fmt.Errorf("parse url %s error: %v", u, err) + return nil, fmt.Errorf("parse url %s error: %v", u, err) } uu.Path = path.Join(RaftStreamPrefix, s.id.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { - return fmt.Errorf("new request to %s error: %v", u, err) + return nil, fmt.Errorf("new request to %s error: %v", u, err) } req.Header.Set("X-Etcd-Cluster-ID", cid.String()) req.Header.Set("X-Raft-To", s.to.String()) req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10)) resp, err := tr.RoundTrip(req) if err != nil { - return fmt.Errorf("error posting to %q: %v", u, err) + return nil, fmt.Errorf("error posting to %q: %v", u, err) } if resp.StatusCode != http.StatusOK { resp.Body.Close() - return fmt.Errorf("unhandled http status %d", resp.StatusCode) + return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) } s.closer = resp.Body go s.handle(resp.Body) log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term) - return nil + return s, nil } func (s *streamClient) stop() {