rafthttp: drop messages in channel when disconnection

The messages in channel are outdated, and there is no need to send
them in the future. It also reports unreachable if there are messages
in the channel.
This commit is contained in:
Yicheng Qin 2015-03-10 23:55:37 -07:00
parent 62a7e2f41f
commit e41cbeda5d
4 changed files with 47 additions and 33 deletions

View File

@ -122,10 +122,10 @@ func newServerStats() *stats.ServerStats {
func waitStreamWorking(p *peer) bool { func waitStreamWorking(p *peer) bool {
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if !p.msgAppWriter.isWorking() { if _, ok := p.msgAppWriter.writec(); !ok {
continue continue
} }
if !p.writer.isWorking() { if _, ok := p.writer.writec(); !ok {
continue continue
} }
return true return true

View File

@ -115,8 +115,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
p := &peer{ p := &peer{
id: to, id: to,
r: r, r: r,
msgAppWriter: startStreamWriter(fs, r), msgAppWriter: startStreamWriter(to, fs, r),
writer: startStreamWriter(fs, r), writer: startStreamWriter(to, fs, r),
pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc),
sendc: make(chan raftpb.Message), sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize), recvc: make(chan raftpb.Message, recvBufSize),
@ -244,20 +244,18 @@ func (p *peer) Stop() {
// pick picks a chan for sending the given message. The picked chan and the picked chan // pick picks a chan for sending the given message. The picked chan and the picked chan
// string name are returned. // string name are returned.
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) { func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
switch { var ok bool
// Considering MsgSnap may have a big size, e.g., 1G, and will block // Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap. // stream for a long time, only use one of the N pipelines to send MsgSnap.
case isMsgSnap(m): if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
return p.msgAppWriter.msgc, streamApp
case p.writer.isWorking():
return p.writer.msgc, streamMsg
default:
return p.pipeline.msgc, pipelineMsg return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) {
return writec, streamApp
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
} }
return return p.pipeline.msgc, pipelineMsg
} }
func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

View File

@ -61,6 +61,7 @@ type outgoingConn struct {
// streamWriter is a long-running go-routine that writes messages into the // streamWriter is a long-running go-routine that writes messages into the
// attached outgoingConn. // attached outgoingConn.
type streamWriter struct { type streamWriter struct {
id types.ID
fs *stats.FollowerStats fs *stats.FollowerStats
r Raft r Raft
@ -74,8 +75,9 @@ type streamWriter struct {
done chan struct{} done chan struct{}
} }
func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter { func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter {
w := &streamWriter{ w := &streamWriter{
id: id,
fs: fs, fs: fs,
r: r, r: r,
msgc: make(chan raftpb.Message, streamBufSize), msgc: make(chan raftpb.Message, streamBufSize),
@ -163,18 +165,23 @@ func (cw *streamWriter) run() {
} }
} }
func (cw *streamWriter) isWorking() bool { func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
cw.mu.Lock() cw.mu.Lock()
defer cw.mu.Unlock() defer cw.mu.Unlock()
return cw.working return cw.msgc, cw.working
} }
func (cw *streamWriter) resetCloser() { func (cw *streamWriter) resetCloser() {
cw.mu.Lock() cw.mu.Lock()
defer cw.mu.Unlock() defer cw.mu.Unlock()
if cw.working { if !cw.working {
cw.closer.Close() return
} }
cw.closer.Close()
if len(cw.msgc) > 0 {
cw.r.ReportUnreachable(uint64(cw.id))
}
cw.msgc = make(chan raftpb.Message, streamBufSize)
cw.working = false cw.working = false
} }

View File

@ -18,10 +18,10 @@ import (
// to streamWriter. After that, streamWriter can use it to send messages // to streamWriter. After that, streamWriter can use it to send messages
// continuously, and closes it when stopped. // continuously, and closes it when stopped.
func TestStreamWriterAttachOutgoingConn(t *testing.T) { func TestStreamWriterAttachOutgoingConn(t *testing.T) {
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
// the expected initial state of streamWrite is not working // the expected initial state of streamWrite is not working
if g := sw.isWorking(); g != false { if _, ok := sw.writec(); ok != false {
t.Errorf("initial working status = %v, want false", g) t.Errorf("initial working status = %v, want false", ok)
} }
// repeatitive tests to ensure it can use latest connection // repeatitive tests to ensure it can use latest connection
@ -36,15 +36,15 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed)
} }
// starts working // starts working
if g := sw.isWorking(); g != true { if _, ok := sw.writec(); ok != true {
t.Errorf("#%d: working status = %v, want true", i, g) t.Errorf("#%d: working status = %v, want true", i, ok)
} }
sw.msgc <- raftpb.Message{} sw.msgc <- raftpb.Message{}
testutil.ForceGosched() testutil.ForceGosched()
// still working // still working
if g := sw.isWorking(); g != true { if _, ok := sw.writec(); ok != true {
t.Errorf("#%d: working status = %v, want true", i, g) t.Errorf("#%d: working status = %v, want true", i, ok)
} }
if wfc.written == 0 { if wfc.written == 0 {
t.Errorf("#%d: failed to write to the underlying connection", i) t.Errorf("#%d: failed to write to the underlying connection", i)
@ -53,8 +53,8 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
sw.stop() sw.stop()
// no longer in working status now // no longer in working status now
if g := sw.isWorking(); g != false { if _, ok := sw.writec(); ok != false {
t.Errorf("working status after stop = %v, want false", g) t.Errorf("working status after stop = %v, want false", ok)
} }
if wfc.closed != true { if wfc.closed != true {
t.Errorf("failed to close the underlying connection") t.Errorf("failed to close the underlying connection")
@ -64,7 +64,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
// outgoingConn will close the outgoingConn and fall back to non-working status. // outgoingConn will close the outgoingConn and fall back to non-working status.
func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop() defer sw.stop()
wfc := &fakeWriteFlushCloser{err: errors.New("blah")} wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@ -72,8 +72,8 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
sw.msgc <- raftpb.Message{} sw.msgc <- raftpb.Message{}
testutil.ForceGosched() testutil.ForceGosched()
// no longer working // no longer working
if g := sw.isWorking(); g != false { if _, ok := sw.writec(); ok != false {
t.Errorf("working = %v, want false", g) t.Errorf("working = %v, want false", ok)
} }
if wfc.closed != true { if wfc.closed != true {
t.Errorf("failed to close the underlying connection") t.Errorf("failed to close the underlying connection")
@ -197,7 +197,7 @@ func TestStream(t *testing.T) {
srv := httptest.NewServer(h) srv := httptest.NewServer(h)
defer srv.Close() defer srv.Close()
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop() defer sw.stop()
h.sw = sw h.sw = sw
@ -207,8 +207,17 @@ func TestStream(t *testing.T) {
if tt.t == streamTypeMsgApp { if tt.t == streamTypeMsgApp {
sr.updateMsgAppTerm(tt.term) sr.updateMsgAppTerm(tt.term)
} }
// wait for stream to work
var writec chan<- raftpb.Message
for {
var ok bool
if writec, ok = sw.writec(); ok {
break
}
time.Sleep(time.Millisecond)
}
sw.msgc <- tt.m writec <- tt.m
var m raftpb.Message var m raftpb.Message
select { select {
case m = <-tt.wc: case m = <-tt.wc: