mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2973 from yichengq/fix-recv-log
rafthttp: fix the misformat logging line, and rename internal var for more clarity
This commit is contained in:
commit
dccec11bb4
@ -248,15 +248,15 @@ func (cw *streamWriter) stop() {
|
|||||||
// streamReader is a long-running go-routine that dials to the remote stream
|
// streamReader is a long-running go-routine that dials to the remote stream
|
||||||
// endponit and reads messages from the response body returned.
|
// endponit and reads messages from the response body returned.
|
||||||
type streamReader struct {
|
type streamReader struct {
|
||||||
tr http.RoundTripper
|
tr http.RoundTripper
|
||||||
picker *urlPicker
|
picker *urlPicker
|
||||||
t streamType
|
t streamType
|
||||||
from, to types.ID
|
local, remote types.ID
|
||||||
cid types.ID
|
cid types.ID
|
||||||
status *peerStatus
|
status *peerStatus
|
||||||
recvc chan<- raftpb.Message
|
recvc chan<- raftpb.Message
|
||||||
propc chan<- raftpb.Message
|
propc chan<- raftpb.Message
|
||||||
errorc chan<- error
|
errorc chan<- error
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
msgAppTerm uint64
|
msgAppTerm uint64
|
||||||
@ -266,13 +266,13 @@ type streamReader struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
|
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
|
||||||
r := &streamReader{
|
r := &streamReader{
|
||||||
tr: tr,
|
tr: tr,
|
||||||
picker: picker,
|
picker: picker,
|
||||||
t: t,
|
t: t,
|
||||||
from: from,
|
local: local,
|
||||||
to: to,
|
remote: remote,
|
||||||
cid: cid,
|
cid: cid,
|
||||||
status: status,
|
status: status,
|
||||||
recvc: recvc,
|
recvc: recvc,
|
||||||
@ -330,9 +330,9 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|||||||
cr.mu.Lock()
|
cr.mu.Lock()
|
||||||
switch t {
|
switch t {
|
||||||
case streamTypeMsgApp:
|
case streamTypeMsgApp:
|
||||||
dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
|
dec = &msgAppDecoder{r: rc, local: cr.local, remote: cr.remote, term: cr.msgAppTerm}
|
||||||
case streamTypeMsgAppV2:
|
case streamTypeMsgAppV2:
|
||||||
dec = newMsgAppV2Decoder(rc, cr.from, cr.to)
|
dec = newMsgAppV2Decoder(rc, cr.local, cr.remote)
|
||||||
case streamTypeMessage:
|
case streamTypeMessage:
|
||||||
dec = &messageDecoder{r: rc}
|
dec = &messageDecoder{r: rc}
|
||||||
default:
|
default:
|
||||||
@ -360,9 +360,9 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|||||||
case recvc <- m:
|
case recvc <- m:
|
||||||
default:
|
default:
|
||||||
if cr.status.isActive() {
|
if cr.status.isActive() {
|
||||||
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
|
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
|
||||||
} else {
|
} else {
|
||||||
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
|
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -406,18 +406,18 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
|
|||||||
cr.mu.Unlock()
|
cr.mu.Unlock()
|
||||||
|
|
||||||
uu := u
|
uu := u
|
||||||
uu.Path = path.Join(t.endpoint(), cr.from.String())
|
uu.Path = path.Join(t.endpoint(), cr.local.String())
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", uu.String(), nil)
|
req, err := http.NewRequest("GET", uu.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cr.picker.unreachable(u)
|
cr.picker.unreachable(u)
|
||||||
return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
|
return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
|
||||||
}
|
}
|
||||||
req.Header.Set("X-Server-From", cr.from.String())
|
req.Header.Set("X-Server-From", cr.local.String())
|
||||||
req.Header.Set("X-Server-Version", version.Version)
|
req.Header.Set("X-Server-Version", version.Version)
|
||||||
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
|
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
|
||||||
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
|
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
|
||||||
req.Header.Set("X-Raft-To", cr.to.String())
|
req.Header.Set("X-Raft-To", cr.remote.String())
|
||||||
if t == streamTypeMsgApp {
|
if t == streamTypeMsgApp {
|
||||||
req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
|
req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
|
||||||
}
|
}
|
||||||
@ -452,7 +452,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
|
|||||||
return resp.Body, nil
|
return resp.Body, nil
|
||||||
case http.StatusNotFound:
|
case http.StatusNotFound:
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to)
|
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
|
||||||
case http.StatusPreconditionFailed:
|
case http.StatusPreconditionFailed:
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -463,11 +463,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
|
|||||||
|
|
||||||
switch strings.TrimSuffix(string(b), "\n") {
|
switch strings.TrimSuffix(string(b), "\n") {
|
||||||
case errIncompatibleVersion.Error():
|
case errIncompatibleVersion.Error():
|
||||||
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to)
|
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.remote)
|
||||||
return nil, errIncompatibleVersion
|
return nil, errIncompatibleVersion
|
||||||
case errClusterIDMismatch.Error():
|
case errClusterIDMismatch.Error():
|
||||||
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
|
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
|
||||||
cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
|
cr.remote, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
|
||||||
return nil, errClusterIDMismatch
|
return nil, errClusterIDMismatch
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
|
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
|
||||||
|
@ -89,8 +89,8 @@ func TestStreamReaderDialRequest(t *testing.T) {
|
|||||||
sr := &streamReader{
|
sr := &streamReader{
|
||||||
tr: tr,
|
tr: tr,
|
||||||
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||||
from: types.ID(1),
|
local: types.ID(1),
|
||||||
to: types.ID(2),
|
remote: types.ID(2),
|
||||||
cid: types.ID(1),
|
cid: types.ID(1),
|
||||||
msgAppTerm: 1,
|
msgAppTerm: 1,
|
||||||
}
|
}
|
||||||
@ -143,8 +143,8 @@ func TestStreamReaderDialResult(t *testing.T) {
|
|||||||
sr := &streamReader{
|
sr := &streamReader{
|
||||||
tr: tr,
|
tr: tr,
|
||||||
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||||
from: types.ID(1),
|
local: types.ID(1),
|
||||||
to: types.ID(2),
|
remote: types.ID(2),
|
||||||
cid: types.ID(1),
|
cid: types.ID(1),
|
||||||
errorc: make(chan error, 1),
|
errorc: make(chan error, 1),
|
||||||
}
|
}
|
||||||
@ -206,8 +206,8 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
|||||||
sr := &streamReader{
|
sr := &streamReader{
|
||||||
tr: tr,
|
tr: tr,
|
||||||
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||||
from: types.ID(1),
|
local: types.ID(1),
|
||||||
to: types.ID(2),
|
remote: types.ID(2),
|
||||||
cid: types.ID(1),
|
cid: types.ID(1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user