mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

In rafthttp, when making request to some endpoint, it may receive response with unexpected status code and header. This indicates the endpoint doesn't function correctly. It should mark the endpoint unreachable.
470 lines
11 KiB
Go
470 lines
11 KiB
Go
// Copyright 2015 CoreOS, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rafthttp
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
"github.com/coreos/etcd/pkg/httputil"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/version"
|
|
)
|
|
|
|
const (
|
|
streamTypeMessage streamType = "message"
|
|
streamTypeMsgAppV2 streamType = "msgappv2"
|
|
|
|
streamBufSize = 4096
|
|
)
|
|
|
|
var (
|
|
errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
|
|
|
|
// the key is in string format "major.minor.patch"
|
|
supportedStream = map[string][]streamType{
|
|
"2.0.0": {},
|
|
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
|
|
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
|
|
}
|
|
)
|
|
|
|
type streamType string
|
|
|
|
func (t streamType) endpoint() string {
|
|
switch t {
|
|
case streamTypeMsgAppV2:
|
|
return path.Join(RaftStreamPrefix, "msgapp")
|
|
case streamTypeMessage:
|
|
return path.Join(RaftStreamPrefix, "message")
|
|
default:
|
|
plog.Panicf("unhandled stream type %v", t)
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func (t streamType) String() string {
|
|
switch t {
|
|
case streamTypeMsgAppV2:
|
|
return "stream MsgApp v2"
|
|
case streamTypeMessage:
|
|
return "stream Message"
|
|
default:
|
|
return "unknown stream"
|
|
}
|
|
}
|
|
|
|
var (
|
|
// linkHeartbeatMessage is a special message used as heartbeat message in
|
|
// link layer. It never conflicts with messages from raft because raft
|
|
// doesn't send out messages without From and To fields.
|
|
linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
|
|
)
|
|
|
|
func isLinkHeartbeatMessage(m raftpb.Message) bool {
|
|
return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
|
|
}
|
|
|
|
type outgoingConn struct {
|
|
t streamType
|
|
io.Writer
|
|
http.Flusher
|
|
io.Closer
|
|
}
|
|
|
|
// streamWriter is a long-running go-routine that writes messages into the
|
|
// attached outgoingConn.
|
|
type streamWriter struct {
|
|
id types.ID
|
|
status *peerStatus
|
|
fs *stats.FollowerStats
|
|
r Raft
|
|
|
|
mu sync.Mutex // guard field working and closer
|
|
closer io.Closer
|
|
working bool
|
|
|
|
msgc chan raftpb.Message
|
|
connc chan *outgoingConn
|
|
stopc chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
|
|
w := &streamWriter{
|
|
id: id,
|
|
status: status,
|
|
fs: fs,
|
|
r: r,
|
|
msgc: make(chan raftpb.Message, streamBufSize),
|
|
connc: make(chan *outgoingConn),
|
|
stopc: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
go w.run()
|
|
return w
|
|
}
|
|
|
|
func (cw *streamWriter) run() {
|
|
var msgc chan raftpb.Message
|
|
var heartbeatc <-chan time.Time
|
|
var t streamType
|
|
var enc encoder
|
|
var flusher http.Flusher
|
|
tickc := time.Tick(ConnReadTimeout / 3)
|
|
|
|
for {
|
|
select {
|
|
case <-heartbeatc:
|
|
start := time.Now()
|
|
if err := enc.encode(linkHeartbeatMessage); err != nil {
|
|
reportSentFailure(string(t), linkHeartbeatMessage)
|
|
|
|
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
|
|
cw.close()
|
|
heartbeatc, msgc = nil, nil
|
|
continue
|
|
}
|
|
flusher.Flush()
|
|
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
|
|
case m := <-msgc:
|
|
start := time.Now()
|
|
if err := enc.encode(m); err != nil {
|
|
reportSentFailure(string(t), m)
|
|
|
|
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
|
|
cw.close()
|
|
heartbeatc, msgc = nil, nil
|
|
cw.r.ReportUnreachable(m.To)
|
|
continue
|
|
}
|
|
flusher.Flush()
|
|
reportSentDuration(string(t), m, time.Since(start))
|
|
case conn := <-cw.connc:
|
|
cw.close()
|
|
t = conn.t
|
|
switch conn.t {
|
|
case streamTypeMsgAppV2:
|
|
enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
|
|
case streamTypeMessage:
|
|
enc = &messageEncoder{w: conn.Writer}
|
|
default:
|
|
plog.Panicf("unhandled stream type %s", conn.t)
|
|
}
|
|
flusher = conn.Flusher
|
|
cw.mu.Lock()
|
|
cw.status.activate()
|
|
cw.closer = conn.Closer
|
|
cw.working = true
|
|
cw.mu.Unlock()
|
|
heartbeatc, msgc = tickc, cw.msgc
|
|
case <-cw.stopc:
|
|
cw.close()
|
|
close(cw.done)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
|
|
cw.mu.Lock()
|
|
defer cw.mu.Unlock()
|
|
return cw.msgc, cw.working
|
|
}
|
|
|
|
func (cw *streamWriter) close() {
|
|
cw.mu.Lock()
|
|
defer cw.mu.Unlock()
|
|
if !cw.working {
|
|
return
|
|
}
|
|
cw.closer.Close()
|
|
if len(cw.msgc) > 0 {
|
|
cw.r.ReportUnreachable(uint64(cw.id))
|
|
}
|
|
cw.msgc = make(chan raftpb.Message, streamBufSize)
|
|
cw.working = false
|
|
}
|
|
|
|
func (cw *streamWriter) attach(conn *outgoingConn) bool {
|
|
select {
|
|
case cw.connc <- conn:
|
|
return true
|
|
case <-cw.done:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (cw *streamWriter) stop() {
|
|
close(cw.stopc)
|
|
<-cw.done
|
|
}
|
|
|
|
// streamReader is a long-running go-routine that dials to the remote stream
|
|
// endponit and reads messages from the response body returned.
|
|
type streamReader struct {
|
|
tr http.RoundTripper
|
|
picker *urlPicker
|
|
t streamType
|
|
local, remote types.ID
|
|
cid types.ID
|
|
status *peerStatus
|
|
recvc chan<- raftpb.Message
|
|
propc chan<- raftpb.Message
|
|
errorc chan<- error
|
|
|
|
mu sync.Mutex
|
|
cancel func()
|
|
closer io.Closer
|
|
stopc chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
|
|
r := &streamReader{
|
|
tr: tr,
|
|
picker: picker,
|
|
t: t,
|
|
local: local,
|
|
remote: remote,
|
|
cid: cid,
|
|
status: status,
|
|
recvc: recvc,
|
|
propc: propc,
|
|
errorc: errorc,
|
|
stopc: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
go r.run()
|
|
return r
|
|
}
|
|
|
|
func (cr *streamReader) run() {
|
|
for {
|
|
t := cr.t
|
|
rc, err := cr.dial(t)
|
|
if err != nil {
|
|
if err != errUnsupportedStreamType {
|
|
cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
|
|
}
|
|
} else {
|
|
cr.status.activate()
|
|
err := cr.decodeLoop(rc, t)
|
|
switch {
|
|
// all data is read out
|
|
case err == io.EOF:
|
|
// connection is closed by the remote
|
|
case isClosedConnectionError(err):
|
|
default:
|
|
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
|
|
}
|
|
}
|
|
select {
|
|
// Wait 100ms to create a new stream, so it doesn't bring too much
|
|
// overhead when retry.
|
|
case <-time.After(100 * time.Millisecond):
|
|
case <-cr.stopc:
|
|
close(cr.done)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|
var dec decoder
|
|
cr.mu.Lock()
|
|
switch t {
|
|
case streamTypeMsgAppV2:
|
|
dec = newMsgAppV2Decoder(rc, cr.local, cr.remote)
|
|
case streamTypeMessage:
|
|
dec = &messageDecoder{r: rc}
|
|
default:
|
|
plog.Panicf("unhandled stream type %s", t)
|
|
}
|
|
cr.closer = rc
|
|
cr.mu.Unlock()
|
|
|
|
for {
|
|
m, err := dec.decode()
|
|
if err != nil {
|
|
cr.mu.Lock()
|
|
cr.close()
|
|
cr.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
if isLinkHeartbeatMessage(m) {
|
|
// raft is not interested in link layer
|
|
// heartbeat message, so we should ignore
|
|
// it.
|
|
continue
|
|
}
|
|
|
|
recvc := cr.recvc
|
|
if m.Type == raftpb.MsgProp {
|
|
recvc = cr.propc
|
|
}
|
|
|
|
select {
|
|
case recvc <- m:
|
|
default:
|
|
if cr.status.isActive() {
|
|
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
|
|
} else {
|
|
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *streamReader) stop() {
|
|
close(cr.stopc)
|
|
cr.mu.Lock()
|
|
if cr.cancel != nil {
|
|
cr.cancel()
|
|
}
|
|
cr.close()
|
|
cr.mu.Unlock()
|
|
<-cr.done
|
|
}
|
|
|
|
func (cr *streamReader) isWorking() bool {
|
|
cr.mu.Lock()
|
|
defer cr.mu.Unlock()
|
|
return cr.closer != nil
|
|
}
|
|
|
|
func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
|
|
u := cr.picker.pick()
|
|
uu := u
|
|
uu.Path = path.Join(t.endpoint(), cr.local.String())
|
|
|
|
req, err := http.NewRequest("GET", uu.String(), nil)
|
|
if err != nil {
|
|
cr.picker.unreachable(u)
|
|
return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
|
|
}
|
|
req.Header.Set("X-Server-From", cr.local.String())
|
|
req.Header.Set("X-Server-Version", version.Version)
|
|
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
|
|
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
|
|
req.Header.Set("X-Raft-To", cr.remote.String())
|
|
|
|
cr.mu.Lock()
|
|
select {
|
|
case <-cr.stopc:
|
|
cr.mu.Unlock()
|
|
return nil, fmt.Errorf("stream reader is stopped")
|
|
default:
|
|
}
|
|
cr.cancel = httputil.RequestCanceler(cr.tr, req)
|
|
cr.mu.Unlock()
|
|
|
|
resp, err := cr.tr.RoundTrip(req)
|
|
if err != nil {
|
|
cr.picker.unreachable(u)
|
|
return nil, err
|
|
}
|
|
|
|
rv := serverVersion(resp.Header)
|
|
lv := semver.Must(semver.NewVersion(version.Version))
|
|
if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
|
|
resp.Body.Close()
|
|
cr.picker.unreachable(u)
|
|
return nil, errUnsupportedStreamType
|
|
}
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusGone:
|
|
resp.Body.Close()
|
|
cr.picker.unreachable(u)
|
|
err := fmt.Errorf("the member has been permanently removed from the cluster")
|
|
select {
|
|
case cr.errorc <- err:
|
|
default:
|
|
}
|
|
return nil, err
|
|
case http.StatusOK:
|
|
return resp.Body, nil
|
|
case http.StatusNotFound:
|
|
resp.Body.Close()
|
|
cr.picker.unreachable(u)
|
|
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
|
|
case http.StatusPreconditionFailed:
|
|
b, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
cr.picker.unreachable(u)
|
|
return nil, err
|
|
}
|
|
resp.Body.Close()
|
|
cr.picker.unreachable(u)
|
|
|
|
switch strings.TrimSuffix(string(b), "\n") {
|
|
case errIncompatibleVersion.Error():
|
|
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.remote)
|
|
return nil, errIncompatibleVersion
|
|
case errClusterIDMismatch.Error():
|
|
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
|
|
cr.remote, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
|
|
return nil, errClusterIDMismatch
|
|
default:
|
|
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
|
|
}
|
|
default:
|
|
resp.Body.Close()
|
|
cr.picker.unreachable(u)
|
|
return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
|
|
}
|
|
}
|
|
|
|
func (cr *streamReader) close() {
|
|
if cr.closer != nil {
|
|
cr.closer.Close()
|
|
}
|
|
cr.closer = nil
|
|
}
|
|
|
|
func isClosedConnectionError(err error) bool {
|
|
operr, ok := err.(*net.OpError)
|
|
return ok && operr.Err.Error() == "use of closed network connection"
|
|
}
|
|
|
|
// checkStreamSupport checks whether the stream type is supported in the
|
|
// given version.
|
|
func checkStreamSupport(v *semver.Version, t streamType) bool {
|
|
nv := &semver.Version{Major: v.Major, Minor: v.Minor}
|
|
for _, s := range supportedStream[nv.String()] {
|
|
if s == t {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isNetworkTimeoutError(err error) bool {
|
|
nerr, ok := err.(net.Error)
|
|
return ok && nerr.Timeout()
|
|
}
|