mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: specify request timeout error due to connection lost
It specifies request timeout error possibly caused by connection lost, and print out better log for user to understand. It handles two cases: 1. the leader cannot connect to majority of cluster. 2. the connection between follower and leader is down for a while, and it losts proposals. log format: ``` 20:04:19 etcd3 | 2015-08-25 20:04:19.368126 E | etcdhttp: etcdserver: request timed out, possibly due to connection lost 20:04:19 etcd3 | 2015-08-25 20:04:19.368227 E | etcdhttp: etcdserver: request timed out, possibly due to connection lost ```
This commit is contained in:
parent
32ab3f6931
commit
8f6bf029f8
@ -21,15 +21,16 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||
ErrStopped = errors.New("etcdserver: server stopped")
|
||||
ErrIDRemoved = errors.New("etcdserver: ID removed")
|
||||
ErrIDExists = errors.New("etcdserver: ID exists")
|
||||
ErrIDNotFound = errors.New("etcdserver: ID not found")
|
||||
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||
ErrStopped = errors.New("etcdserver: server stopped")
|
||||
ErrIDRemoved = errors.New("etcdserver: ID removed")
|
||||
ErrIDExists = errors.New("etcdserver: ID exists")
|
||||
ErrIDNotFound = errors.New("etcdserver: ID not found")
|
||||
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
|
||||
)
|
||||
|
||||
func isKeyNotFound(err error) bool {
|
||||
|
@ -593,9 +593,10 @@ func writeKeyError(w http.ResponseWriter, err error) {
|
||||
case *etcdErr.Error:
|
||||
e.WriteTo(w)
|
||||
default:
|
||||
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||
switch err {
|
||||
case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost:
|
||||
plog.Error(err)
|
||||
} else {
|
||||
default:
|
||||
plog.Errorf("got unexpected response error (%v)", err)
|
||||
}
|
||||
ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0)
|
||||
|
@ -54,9 +54,10 @@ func writeError(w http.ResponseWriter, err error) {
|
||||
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
|
||||
herr.WriteTo(w)
|
||||
default:
|
||||
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||
switch err {
|
||||
case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost:
|
||||
plog.Error(err)
|
||||
} else {
|
||||
default:
|
||||
plog.Errorf("got unexpected response error (%v)", err)
|
||||
}
|
||||
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
|
||||
|
@ -1024,8 +1024,42 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
||||
if start.After(prevLeadLost) && start.Before(curLeadElected) {
|
||||
return ErrTimeoutDueToLeaderFail
|
||||
}
|
||||
|
||||
lead := types.ID(atomic.LoadUint64(&s.r.lead))
|
||||
switch lead {
|
||||
case types.ID(raft.None):
|
||||
// TODO: return error to specify it happens because the cluster does not have leader now
|
||||
case s.ID():
|
||||
if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
|
||||
return ErrTimeoutDueToConnectionLost
|
||||
}
|
||||
default:
|
||||
if !isConnectedSince(s.r.transport, start, lead) {
|
||||
return ErrTimeoutDueToConnectionLost
|
||||
}
|
||||
}
|
||||
|
||||
return ErrTimeout
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
// quorum of the cluster since the given time.
|
||||
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool {
|
||||
var connectedNum int
|
||||
for _, m := range members {
|
||||
if m.ID == self || isConnectedSince(transport, since, m.ID) {
|
||||
connectedNum++
|
||||
}
|
||||
}
|
||||
return connectedNum >= (len(members)+1)/2
|
||||
}
|
||||
|
||||
// isConnectedSince checks whether the local member is connected to the
|
||||
// remote member since the given time.
|
||||
func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool {
|
||||
t := transport.ActiveSince(remote)
|
||||
return !t.IsZero() && t.Before(since)
|
||||
}
|
||||
|
@ -1417,6 +1417,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
func (s *nopTransporter) Resume() {}
|
||||
|
@ -366,4 +366,5 @@ func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr
|
||||
func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls }
|
||||
func (pr *fakePeer) setTerm(term uint64) { pr.term = term }
|
||||
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
|
||||
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
|
||||
func (pr *fakePeer) Stop() {}
|
||||
|
@ -66,6 +66,9 @@ type Peer interface {
|
||||
// connection hands over to the peer. The peer will close the connection
|
||||
// when it is no longer used.
|
||||
attachOutgoingConn(conn *outgoingConn)
|
||||
// activeSince returns the time that the connection with the
|
||||
// peer becomes active.
|
||||
activeSince() time.Time
|
||||
// Stop performs any necessary finalization and terminates the peer
|
||||
// elegantly.
|
||||
Stop()
|
||||
@ -87,6 +90,8 @@ type peer struct {
|
||||
id types.ID
|
||||
r Raft
|
||||
|
||||
status *peerStatus
|
||||
|
||||
msgAppWriter *streamWriter
|
||||
writer *streamWriter
|
||||
pipeline *pipeline
|
||||
@ -112,6 +117,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
|
||||
p := &peer{
|
||||
id: to,
|
||||
r: r,
|
||||
status: status,
|
||||
msgAppWriter: startStreamWriter(to, status, fs, r),
|
||||
writer: startStreamWriter(to, status, fs, r),
|
||||
pipeline: newPipeline(tr, picker, local, to, cid, status, fs, r, errorc),
|
||||
@ -223,6 +229,8 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) activeSince() time.Time { return p.status.activeSince }
|
||||
|
||||
// Pause pauses the peer. The peer will simply drops all incoming
|
||||
// messages without retruning an error.
|
||||
func (p *peer) Pause() {
|
||||
|
@ -17,6 +17,7 @@ package rafthttp
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
@ -27,10 +28,11 @@ type failureType struct {
|
||||
}
|
||||
|
||||
type peerStatus struct {
|
||||
id types.ID
|
||||
mu sync.Mutex // protect active and failureMap
|
||||
active bool
|
||||
failureMap map[failureType]string
|
||||
id types.ID
|
||||
mu sync.Mutex // protect variables below
|
||||
active bool
|
||||
failureMap map[failureType]string
|
||||
activeSince time.Time
|
||||
}
|
||||
|
||||
func newPeerStatus(id types.ID) *peerStatus {
|
||||
@ -46,6 +48,7 @@ func (s *peerStatus) activate() {
|
||||
if !s.active {
|
||||
plog.Infof("the connection with %s became active", s.id)
|
||||
s.active = true
|
||||
s.activeSince = time.Now()
|
||||
s.failureMap = make(map[failureType]string)
|
||||
}
|
||||
}
|
||||
@ -56,6 +59,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
|
||||
if s.active {
|
||||
plog.Infof("the connection with %s became inactive", s.id)
|
||||
s.active = false
|
||||
s.activeSince = time.Time{}
|
||||
}
|
||||
logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
|
||||
if r, ok := s.failureMap[failure]; ok && r == reason {
|
||||
|
@ -17,6 +17,7 @@ package rafthttp
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
|
||||
@ -68,6 +69,11 @@ type Transporter interface {
|
||||
// It is the caller's responsibility to ensure the urls are all valid,
|
||||
// or it panics.
|
||||
UpdatePeer(id types.ID, urls []string)
|
||||
// ActiveSince returns the time that the connection with the peer
|
||||
// of the given id becomes active.
|
||||
// If the connection is active since peer was added, it returns the adding time.
|
||||
// If the connection is currently inactive, it returns zero time.
|
||||
ActiveSince(id types.ID) time.Time
|
||||
// Stop closes the connections and stops the transporter.
|
||||
Stop()
|
||||
}
|
||||
@ -248,6 +254,15 @@ func (t *transport) UpdatePeer(id types.ID, us []string) {
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
}
|
||||
|
||||
func (t *transport) ActiveSince(id types.ID) time.Time {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if p, ok := t.peers[id]; ok {
|
||||
return p.activeSince()
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
type Pausable interface {
|
||||
Pause()
|
||||
Resume()
|
||||
|
Loading…
x
Reference in New Issue
Block a user