diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index 69f49db04..88deb9529 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -57,23 +57,20 @@ func TestPipelineSend(t *testing.T) { // TestPipelineKeepSendingWhenPostError tests that pipeline can keep // sending messages if previous messages meet post error. func TestPipelineKeepSendingWhenPostError(t *testing.T) { - tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")} + tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) fs := &stats.FollowerStats{} tp := &Transport{pipelineRt: tr} p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + defer p.stop() for i := 0; i < 50; i++ { p.msgc <- raftpb.Message{Type: raftpb.MsgApp} } - testutil.WaitSchedule() - p.stop() - // check it send out 50 requests - tr.mu.Lock() - defer tr.mu.Unlock() - if tr.reqCount != 50 { - t.Errorf("request count = %d, want 50", tr.reqCount) + _, err := tr.rec.Wait(50) + if err != nil { + t.Errorf("unexpected wait error %v", err) } } @@ -269,8 +266,8 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) { } type respRoundTripper struct { - mu sync.Mutex - reqCount int + mu sync.Mutex + rec testutil.Recorder code int header http.Header @@ -283,7 +280,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper { func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { t.mu.Lock() defer t.mu.Unlock() - t.reqCount++ + if t.rec != nil { + t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}}) + } return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err }