diff --git a/etcdserver/errors.go b/etcdserver/errors.go index f570837cc..22b443aba 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -21,15 +21,16 @@ import ( ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrIDRemoved = errors.New("etcdserver: ID removed") - ErrIDExists = errors.New("etcdserver: ID exists") - ErrIDNotFound = errors.New("etcdserver: ID not found") - ErrPeerURLexists = errors.New("etcdserver: peerURL exists") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") - ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrIDRemoved = errors.New("etcdserver: ID removed") + ErrIDExists = errors.New("etcdserver: ID exists") + ErrIDNotFound = errors.New("etcdserver: ID not found") + ErrPeerURLexists = errors.New("etcdserver: peerURL exists") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index cd3ecb1be..64963798f 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -593,9 +593,10 @@ func writeKeyError(w http.ResponseWriter, err error) { case *etcdErr.Error: e.WriteTo(w) default: - if err == etcdserver.ErrTimeoutDueToLeaderFail { + switch err { + case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: plog.Error(err) - } else { + default: plog.Errorf("got unexpected response error (%v)", err) } ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 062621095..4350a8eba 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -54,9 +54,10 @@ func writeError(w http.ResponseWriter, err error) { herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error()) herr.WriteTo(w) default: - if err == etcdserver.ErrTimeoutDueToLeaderFail { + switch err { + case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: plog.Error(err) - } else { + default: plog.Errorf("got unexpected response error (%v)", err) } herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error") diff --git a/etcdserver/server.go b/etcdserver/server.go index e3d0f20f6..8e591e596 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1024,8 +1024,42 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { if start.After(prevLeadLost) && start.Before(curLeadElected) { return ErrTimeoutDueToLeaderFail } + + lead := types.ID(atomic.LoadUint64(&s.r.lead)) + switch lead { + case types.ID(raft.None): + // TODO: return error to specify it happens because the cluster does not have leader now + case s.ID(): + if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) { + return ErrTimeoutDueToConnectionLost + } + default: + if !isConnectedSince(s.r.transport, start, lead) { + return ErrTimeoutDueToConnectionLost + } + } + return ErrTimeout default: return err } } + +// isConnectedToQuorumSince checks whether the local member is connected to the +// quorum of the cluster since the given time. +func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool { + var connectedNum int + for _, m := range members { + if m.ID == self || isConnectedSince(transport, since, m.ID) { + connectedNum++ + } + } + return connectedNum >= (len(members)+1)/2 +} + +// isConnectedSince checks whether the local member is connected to the +// remote member since the given time. +func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool { + t := transport.ActiveSince(remote) + return !t.IsZero() && t.Before(since) +} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index cba869f7f..f88a7c8c8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1417,6 +1417,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index a3aedbabc..dbeb5cb54 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -366,4 +366,5 @@ func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls } func (pr *fakePeer) setTerm(term uint64) { pr.term = term } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } +func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 78af0e3f5..2be3fbeea 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -66,6 +66,9 @@ type Peer interface { // connection hands over to the peer. The peer will close the connection // when it is no longer used. attachOutgoingConn(conn *outgoingConn) + // activeSince returns the time that the connection with the + // peer becomes active. + activeSince() time.Time // Stop performs any necessary finalization and terminates the peer // elegantly. Stop() @@ -87,6 +90,8 @@ type peer struct { id types.ID r Raft + status *peerStatus + msgAppWriter *streamWriter writer *streamWriter pipeline *pipeline @@ -112,6 +117,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p := &peer{ id: to, r: r, + status: status, msgAppWriter: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), pipeline: newPipeline(tr, picker, local, to, cid, status, fs, r, errorc), @@ -223,6 +229,8 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { } } +func (p *peer) activeSince() time.Time { return p.status.activeSince } + // Pause pauses the peer. The peer will simply drops all incoming // messages without retruning an error. func (p *peer) Pause() { diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index c7b8e108f..97893fc92 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -17,6 +17,7 @@ package rafthttp import ( "fmt" "sync" + "time" "github.com/coreos/etcd/pkg/types" ) @@ -27,10 +28,11 @@ type failureType struct { } type peerStatus struct { - id types.ID - mu sync.Mutex // protect active and failureMap - active bool - failureMap map[failureType]string + id types.ID + mu sync.Mutex // protect variables below + active bool + failureMap map[failureType]string + activeSince time.Time } func newPeerStatus(id types.ID) *peerStatus { @@ -46,6 +48,7 @@ func (s *peerStatus) activate() { if !s.active { plog.Infof("the connection with %s became active", s.id) s.active = true + s.activeSince = time.Now() s.failureMap = make(map[failureType]string) } } @@ -56,6 +59,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) { if s.active { plog.Infof("the connection with %s became inactive", s.id) s.active = false + s.activeSince = time.Time{} } logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) if r, ok := s.failureMap[failure]; ok && r == reason { diff --git a/rafthttp/transport.go b/rafthttp/transport.go index cafe7525e..32ac5f764 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -17,6 +17,7 @@ package rafthttp import ( "net/http" "sync" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" @@ -68,6 +69,11 @@ type Transporter interface { // It is the caller's responsibility to ensure the urls are all valid, // or it panics. UpdatePeer(id types.ID, urls []string) + // ActiveSince returns the time that the connection with the peer + // of the given id becomes active. + // If the connection is active since peer was added, it returns the adding time. + // If the connection is currently inactive, it returns zero time. + ActiveSince(id types.ID) time.Time // Stop closes the connections and stops the transporter. Stop() } @@ -248,6 +254,15 @@ func (t *transport) UpdatePeer(id types.ID, us []string) { addPeerToProber(t.prober, id.String(), us) } +func (t *transport) ActiveSince(id types.ID) time.Time { + t.mu.Lock() + defer t.mu.Unlock() + if p, ok := t.peers[id]; ok { + return p.activeSince() + } + return time.Time{} +} + type Pausable interface { Pause() Resume()