diff --git a/etcdserver/errors.go b/etcdserver/errors.go index faab90cfe..cc9f04a27 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -18,32 +18,20 @@ import ( "errors" etcdErr "github.com/coreos/etcd/error" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrIDRemoved = errors.New("etcdserver: ID removed") - ErrIDExists = errors.New("etcdserver: ID exists") - ErrIDNotFound = errors.New("etcdserver: ID not found") - ErrPeerURLexists = errors.New("etcdserver: peerURL exists") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrIDRemoved = errors.New("etcdserver: ID removed") + ErrIDExists = errors.New("etcdserver: ID exists") + ErrIDNotFound = errors.New("etcdserver: ID not found") + ErrPeerURLexists = errors.New("etcdserver: peerURL exists") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderLost = errors.New("etcdserver: request timed out, possibly due to leader lost") ) -func parseCtxErr(err error) error { - switch err { - case context.Canceled: - return ErrCanceled - case context.DeadlineExceeded: - return ErrTimeout - default: - return err - } -} - func isKeyNotFound(err error) bool { e, ok := err.(*etcdErr.Error) return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 1393162d2..fb3aed5c7 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" ) @@ -59,7 +60,11 @@ func writeError(w http.ResponseWriter, err error) { herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error()) herr.WriteTo(w) default: - plog.Errorf("got unexpected response error (%v)", err) + if err == etcdserver.ErrTimeoutDueToLeaderLost { + plog.Error(err) + } else { + plog.Errorf("got unexpected response error (%v)", err) + } herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error") herr.WriteTo(w) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 4f11a23ee..6546c3ccb 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -19,6 +19,7 @@ import ( "expvar" "os" "sort" + "sync" "sync/atomic" "time" @@ -86,6 +87,10 @@ type raftNode struct { term uint64 lead uint64 + mu sync.Mutex + // last lead elected time + lt time.Time + raft.Node // a chan to send out apply @@ -120,6 +125,11 @@ func (r *raftNode) run() { r.Tick() case rd := <-r.Ready(): if rd.SoftState != nil { + if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { + r.mu.Lock() + r.lt = time.Now() + r.mu.Unlock() + } atomic.StoreUint64(&r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { syncC = r.s.SyncTicker @@ -177,6 +187,12 @@ func (r *raftNode) apply() chan apply { return r.applyc } +func (r *raftNode) leadElectedTime() time.Time { + r.mu.Lock() + defer r.mu.Unlock() + return r.lt +} + func (r *raftNode) stop() { r.Stop() r.transport.Stop() diff --git a/etcdserver/server.go b/etcdserver/server.go index c8ad680da..1309107fd 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -551,7 +551,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { case <-ctx.Done(): proposeFailed.Inc() s.w.Trigger(r.ID, nil) // GC wait - return Response{}, parseCtxErr(ctx.Err()) + return Response{}, s.parseProposeCtxErr(ctx.Err(), start) case <-s.done: return Response{}, ErrStopped } @@ -646,6 +646,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) + start := time.Now() if err := s.r.ProposeConfChange(ctx, cc); err != nil { s.w.Trigger(cc.ID, nil) return err @@ -661,7 +662,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error return nil case <-ctx.Done(): s.w.Trigger(cc.ID, nil) // GC wait - return parseCtxErr(ctx.Err()) + return s.parseProposeCtxErr(ctx.Err(), start) case <-s.done: return ErrStopped } @@ -1002,3 +1003,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) { plog.Errorf("error updating cluster version (%v)", err) } } + +func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { + switch err { + case context.Canceled: + return ErrCanceled + case context.DeadlineExceeded: + curLeadElected := s.r.leadElectedTime() + prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond) + if start.After(prevLeadLost) && start.Before(curLeadElected) { + return ErrTimeoutDueToLeaderLost + } + return ErrTimeout + default: + return err + } +} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 71c4b4ea8..de1000d69 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -566,6 +566,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1023,6 +1024,7 @@ func TestPublishStopped(t *testing.T) { func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, w: &waitRecorder{}, done: make(chan struct{}),