diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 219d34e6c..60201bfd0 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -317,14 +317,18 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { } } +// updateMsgAppTerm updates the term for MsgApp stream, and closes +// the existing MsgApp stream if term is updated. func (cr *streamReader) updateMsgAppTerm(term uint64) { cr.mu.Lock() defer cr.mu.Unlock() - if cr.msgAppTerm == term { + if cr.msgAppTerm >= term { return } cr.msgAppTerm = term - cr.close() + if cr.t == streamTypeMsgApp { + cr.close() + } } // TODO: always cancel in-flight dial and decode diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index ea44506a0..1bcfa23a1 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -153,6 +153,41 @@ func TestStreamReaderDialResult(t *testing.T) { } } +func TestStreamReaderUpdateMsgAppTerm(t *testing.T) { + term := uint64(2) + tests := []struct { + term uint64 + typ streamType + wterm uint64 + wclose bool + }{ + // lower term + {1, streamTypeMsgApp, 2, false}, + // unchanged term + {2, streamTypeMsgApp, 2, false}, + // higher term + {3, streamTypeMessage, 3, false}, + {3, streamTypeMsgAppV2, 3, false}, + // higher term, reset closer + {3, streamTypeMsgApp, 3, true}, + } + for i, tt := range tests { + closer := &fakeWriteFlushCloser{} + cr := &streamReader{ + msgAppTerm: term, + t: tt.typ, + closer: closer, + } + cr.updateMsgAppTerm(tt.term) + if cr.msgAppTerm != tt.wterm { + t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm) + } + if closer.closed != tt.wclose { + t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose) + } + } +} + // TestStream tests that streamReader and streamWriter can build stream to // send messages between each other. func TestStream(t *testing.T) {