From 8f6bf029f8d92100642f7512a5b0e7e64c5546c2 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 25 Aug 2015 15:14:41 -0700 Subject: [PATCH] etcdserver: specify request timeout error due to connection lost It specifies request timeout error possibly caused by connection lost, and print out better log for user to understand. It handles two cases: 1. the leader cannot connect to majority of cluster. 2. the connection between follower and leader is down for a while, and it losts proposals. log format: ``` 20:04:19 etcd3 | 2015-08-25 20:04:19.368126 E | etcdhttp: etcdserver: request timed out, possibly due to connection lost 20:04:19 etcd3 | 2015-08-25 20:04:19.368227 E | etcdhttp: etcdserver: request timed out, possibly due to connection lost ``` --- etcdserver/errors.go | 19 ++++++++++--------- etcdserver/etcdhttp/client.go | 5 +++-- etcdserver/etcdhttp/http.go | 5 +++-- etcdserver/server.go | 34 ++++++++++++++++++++++++++++++++++ etcdserver/server_test.go | 1 + rafthttp/http_test.go | 1 + rafthttp/peer.go | 8 ++++++++ rafthttp/peer_status.go | 12 ++++++++---- rafthttp/transport.go | 15 +++++++++++++++ 9 files changed, 83 insertions(+), 17 deletions(-) diff --git a/etcdserver/errors.go b/etcdserver/errors.go index f570837cc..22b443aba 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -21,15 +21,16 @@ import ( ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrIDRemoved = errors.New("etcdserver: ID removed") - ErrIDExists = errors.New("etcdserver: ID exists") - ErrIDNotFound = errors.New("etcdserver: ID not found") - ErrPeerURLexists = errors.New("etcdserver: peerURL exists") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") - ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrIDRemoved = errors.New("etcdserver: ID removed") + ErrIDExists = errors.New("etcdserver: ID exists") + ErrIDNotFound = errors.New("etcdserver: ID not found") + ErrPeerURLexists = errors.New("etcdserver: peerURL exists") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index cd3ecb1be..64963798f 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -593,9 +593,10 @@ func writeKeyError(w http.ResponseWriter, err error) { case *etcdErr.Error: e.WriteTo(w) default: - if err == etcdserver.ErrTimeoutDueToLeaderFail { + switch err { + case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: plog.Error(err) - } else { + default: plog.Errorf("got unexpected response error (%v)", err) } ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 062621095..4350a8eba 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -54,9 +54,10 @@ func writeError(w http.ResponseWriter, err error) { herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error()) herr.WriteTo(w) default: - if err == etcdserver.ErrTimeoutDueToLeaderFail { + switch err { + case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: plog.Error(err) - } else { + default: plog.Errorf("got unexpected response error (%v)", err) } herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error") diff --git a/etcdserver/server.go b/etcdserver/server.go index e3d0f20f6..8e591e596 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1024,8 +1024,42 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { if start.After(prevLeadLost) && start.Before(curLeadElected) { return ErrTimeoutDueToLeaderFail } + + lead := types.ID(atomic.LoadUint64(&s.r.lead)) + switch lead { + case types.ID(raft.None): + // TODO: return error to specify it happens because the cluster does not have leader now + case s.ID(): + if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) { + return ErrTimeoutDueToConnectionLost + } + default: + if !isConnectedSince(s.r.transport, start, lead) { + return ErrTimeoutDueToConnectionLost + } + } + return ErrTimeout default: return err } } + +// isConnectedToQuorumSince checks whether the local member is connected to the +// quorum of the cluster since the given time. +func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool { + var connectedNum int + for _, m := range members { + if m.ID == self || isConnectedSince(transport, since, m.ID) { + connectedNum++ + } + } + return connectedNum >= (len(members)+1)/2 +} + +// isConnectedSince checks whether the local member is connected to the +// remote member since the given time. +func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool { + t := transport.ActiveSince(remote) + return !t.IsZero() && t.Before(since) +} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index cba869f7f..f88a7c8c8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1417,6 +1417,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index a3aedbabc..dbeb5cb54 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -366,4 +366,5 @@ func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls } func (pr *fakePeer) setTerm(term uint64) { pr.term = term } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } +func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 78af0e3f5..2be3fbeea 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -66,6 +66,9 @@ type Peer interface { // connection hands over to the peer. The peer will close the connection // when it is no longer used. attachOutgoingConn(conn *outgoingConn) + // activeSince returns the time that the connection with the + // peer becomes active. + activeSince() time.Time // Stop performs any necessary finalization and terminates the peer // elegantly. Stop() @@ -87,6 +90,8 @@ type peer struct { id types.ID r Raft + status *peerStatus + msgAppWriter *streamWriter writer *streamWriter pipeline *pipeline @@ -112,6 +117,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p := &peer{ id: to, r: r, + status: status, msgAppWriter: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), pipeline: newPipeline(tr, picker, local, to, cid, status, fs, r, errorc), @@ -223,6 +229,8 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { } } +func (p *peer) activeSince() time.Time { return p.status.activeSince } + // Pause pauses the peer. The peer will simply drops all incoming // messages without retruning an error. func (p *peer) Pause() { diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index c7b8e108f..97893fc92 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -17,6 +17,7 @@ package rafthttp import ( "fmt" "sync" + "time" "github.com/coreos/etcd/pkg/types" ) @@ -27,10 +28,11 @@ type failureType struct { } type peerStatus struct { - id types.ID - mu sync.Mutex // protect active and failureMap - active bool - failureMap map[failureType]string + id types.ID + mu sync.Mutex // protect variables below + active bool + failureMap map[failureType]string + activeSince time.Time } func newPeerStatus(id types.ID) *peerStatus { @@ -46,6 +48,7 @@ func (s *peerStatus) activate() { if !s.active { plog.Infof("the connection with %s became active", s.id) s.active = true + s.activeSince = time.Now() s.failureMap = make(map[failureType]string) } } @@ -56,6 +59,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) { if s.active { plog.Infof("the connection with %s became inactive", s.id) s.active = false + s.activeSince = time.Time{} } logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) if r, ok := s.failureMap[failure]; ok && r == reason { diff --git a/rafthttp/transport.go b/rafthttp/transport.go index cafe7525e..32ac5f764 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -17,6 +17,7 @@ package rafthttp import ( "net/http" "sync" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" @@ -68,6 +69,11 @@ type Transporter interface { // It is the caller's responsibility to ensure the urls are all valid, // or it panics. UpdatePeer(id types.ID, urls []string) + // ActiveSince returns the time that the connection with the peer + // of the given id becomes active. + // If the connection is active since peer was added, it returns the adding time. + // If the connection is currently inactive, it returns zero time. + ActiveSince(id types.ID) time.Time // Stop closes the connections and stops the transporter. Stop() } @@ -248,6 +254,15 @@ func (t *transport) UpdatePeer(id types.ID, us []string) { addPeerToProber(t.prober, id.String(), us) } +func (t *transport) ActiveSince(id types.ID) time.Time { + t.mu.Lock() + defer t.mu.Unlock() + if p, ok := t.peers[id]; ok { + return p.activeSince() + } + return time.Time{} +} + type Pausable interface { Pause() Resume()