diff --git a/rafthttp/stream.go b/rafthttp/stream.go index f9410dd32..470605308 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -248,15 +248,15 @@ func (cw *streamWriter) stop() { // streamReader is a long-running go-routine that dials to the remote stream // endponit and reads messages from the response body returned. type streamReader struct { - tr http.RoundTripper - picker *urlPicker - t streamType - from, to types.ID - cid types.ID - status *peerStatus - recvc chan<- raftpb.Message - propc chan<- raftpb.Message - errorc chan<- error + tr http.RoundTripper + picker *urlPicker + t streamType + local, remote types.ID + cid types.ID + status *peerStatus + recvc chan<- raftpb.Message + propc chan<- raftpb.Message + errorc chan<- error mu sync.Mutex msgAppTerm uint64 @@ -266,13 +266,13 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { r := &streamReader{ tr: tr, picker: picker, t: t, - from: from, - to: to, + local: local, + remote: remote, cid: cid, status: status, recvc: recvc, @@ -330,9 +330,9 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { cr.mu.Lock() switch t { case streamTypeMsgApp: - dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm} + dec = &msgAppDecoder{r: rc, local: cr.local, remote: cr.remote, term: cr.msgAppTerm} case streamTypeMsgAppV2: - dec = newMsgAppV2Decoder(rc, cr.from, cr.to) + dec = newMsgAppV2Decoder(rc, cr.local, cr.remote) case streamTypeMessage: dec = &messageDecoder{r: rc} default: @@ -360,9 +360,9 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { case recvc <- m: default: if cr.status.isActive() { - plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, m.From) + plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) } else { - plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, m.From) + plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) } } } @@ -406,18 +406,18 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { cr.mu.Unlock() uu := u - uu.Path = path.Join(t.endpoint(), cr.from.String()) + uu.Path = path.Join(t.endpoint(), cr.local.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err) } - req.Header.Set("X-Server-From", cr.from.String()) + req.Header.Set("X-Server-From", cr.local.String()) req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) - req.Header.Set("X-Raft-To", cr.to.String()) + req.Header.Set("X-Raft-To", cr.remote.String()) if t == streamTypeMsgApp { req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10)) } @@ -452,7 +452,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { return resp.Body, nil case http.StatusNotFound: resp.Body.Close() - return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to) + return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote) case http.StatusPreconditionFailed: b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -463,11 +463,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): - plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to) + plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.remote) return nil, errIncompatibleVersion case errClusterIDMismatch.Error(): plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", - cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid) + cr.remote, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid) return nil, errClusterIDMismatch default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index e42326482..caabe64e3 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -89,8 +89,8 @@ func TestStreamReaderDialRequest(t *testing.T) { sr := &streamReader{ tr: tr, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - from: types.ID(1), - to: types.ID(2), + local: types.ID(1), + remote: types.ID(2), cid: types.ID(1), msgAppTerm: 1, } @@ -143,8 +143,8 @@ func TestStreamReaderDialResult(t *testing.T) { sr := &streamReader{ tr: tr, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - from: types.ID(1), - to: types.ID(2), + local: types.ID(1), + remote: types.ID(2), cid: types.ID(1), errorc: make(chan error, 1), } @@ -206,8 +206,8 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { sr := &streamReader{ tr: tr, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - from: types.ID(1), - to: types.ID(2), + local: types.ID(1), + remote: types.ID(2), cid: types.ID(1), }