mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: fix wrong return in pipeline.handle
pipeline.handle is a long-living one, and should continue to receive next message to send out when current message fails to send. So it should `continue` instead of `return` here.
This commit is contained in:
parent
ae62a77de6
commit
4076dda101
@ -106,7 +106,7 @@ func (p *pipeline) handle() {
|
|||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||||
}
|
}
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.status.activate()
|
p.status.activate()
|
||||||
|
@ -16,6 +16,7 @@ package rafthttp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -52,6 +53,28 @@ func TestPipelineSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPipelineKeepSendingWhenPostError tests that pipeline can keep
|
||||||
|
// sending messages if previous messages meet post error.
|
||||||
|
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
||||||
|
tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")}
|
||||||
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
|
fs := &stats.FollowerStats{}
|
||||||
|
p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
||||||
|
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
||||||
|
}
|
||||||
|
testutil.WaitSchedule()
|
||||||
|
p.stop()
|
||||||
|
|
||||||
|
// check it send out 50 requests
|
||||||
|
tr.mu.Lock()
|
||||||
|
defer tr.mu.Unlock()
|
||||||
|
if tr.reqCount != 50 {
|
||||||
|
t.Errorf("request count = %d, want 50", tr.reqCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPipelineExceedMaximumServing(t *testing.T) {
|
func TestPipelineExceedMaximumServing(t *testing.T) {
|
||||||
tr := newRoundTripperBlocker()
|
tr := newRoundTripperBlocker()
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
@ -236,6 +259,9 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type respRoundTripper struct {
|
type respRoundTripper struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
reqCount int
|
||||||
|
|
||||||
code int
|
code int
|
||||||
header http.Header
|
header http.Header
|
||||||
err error
|
err error
|
||||||
@ -245,6 +271,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper {
|
|||||||
return &respRoundTripper{code: code, err: err}
|
return &respRoundTripper{code: code, err: err}
|
||||||
}
|
}
|
||||||
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
t.reqCount++
|
||||||
return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
|
return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user