diff --git a/etcdserver/errors.go b/etcdserver/errors.go index faab90cfe..cc9f04a27 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -18,32 +18,20 @@ import ( "errors" etcdErr "github.com/coreos/etcd/error" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrIDRemoved = errors.New("etcdserver: ID removed") - ErrIDExists = errors.New("etcdserver: ID exists") - ErrIDNotFound = errors.New("etcdserver: ID not found") - ErrPeerURLexists = errors.New("etcdserver: peerURL exists") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrIDRemoved = errors.New("etcdserver: ID removed") + ErrIDExists = errors.New("etcdserver: ID exists") + ErrIDNotFound = errors.New("etcdserver: ID not found") + ErrPeerURLexists = errors.New("etcdserver: peerURL exists") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderLost = errors.New("etcdserver: request timed out, possibly due to leader lost") ) -func parseCtxErr(err error) error { - switch err { - case context.Canceled: - return ErrCanceled - case context.DeadlineExceeded: - return ErrTimeout - default: - return err - } -} - func isKeyNotFound(err error) bool { e, ok := err.(*etcdErr.Error) return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index ffa72881b..0f02954c6 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" ) @@ -53,7 +54,11 @@ func writeError(w http.ResponseWriter, err error) { herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error()) herr.WriteTo(w) default: - plog.Errorf("got unexpected response error (%v)", err) + if err == etcdserver.ErrTimeoutDueToLeaderLost { + plog.Error(err) + } else { + plog.Errorf("got unexpected response error (%v)", err) + } herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error") herr.WriteTo(w) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 336be62bc..245943768 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -19,6 +19,7 @@ import ( "expvar" "os" "sort" + "sync" "sync/atomic" "time" @@ -86,6 +87,10 @@ type raftNode struct { term uint64 lead uint64 + mu sync.Mutex + // last lead elected time + lt time.Time + raft.Node // a chan to send out apply @@ -129,6 +134,11 @@ func (r *raftNode) start(s *EtcdServer) { r.Tick() case rd := <-r.Ready(): if rd.SoftState != nil { + if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { + r.mu.Lock() + r.lt = time.Now() + r.mu.Unlock() + } atomic.StoreUint64(&r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { syncC = r.s.SyncTicker @@ -187,6 +197,12 @@ func (r *raftNode) apply() chan apply { return r.applyc } +func (r *raftNode) leadElectedTime() time.Time { + r.mu.Lock() + defer r.mu.Unlock() + return r.lt +} + func (r *raftNode) stop() { r.stopped <- struct{}{} <-r.done diff --git a/etcdserver/server.go b/etcdserver/server.go index fe3fcd49f..0459a4e74 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -553,7 +553,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { case <-ctx.Done(): proposeFailed.Inc() s.w.Trigger(r.ID, nil) // GC wait - return Response{}, parseCtxErr(ctx.Err()) + return Response{}, s.parseProposeCtxErr(ctx.Err(), start) case <-s.done: return Response{}, ErrStopped } @@ -648,6 +648,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) + start := time.Now() if err := s.r.ProposeConfChange(ctx, cc); err != nil { s.w.Trigger(cc.ID, nil) return err @@ -663,7 +664,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error return nil case <-ctx.Done(): s.w.Trigger(cc.ID, nil) // GC wait - return parseCtxErr(ctx.Err()) + return s.parseProposeCtxErr(ctx.Err(), start) case <-s.done: return ErrStopped } @@ -1014,3 +1015,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) { plog.Errorf("error updating cluster version (%v)", err) } } + +func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { + switch err { + case context.Canceled: + return ErrCanceled + case context.DeadlineExceeded: + curLeadElected := s.r.leadElectedTime() + prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond) + if start.After(prevLeadLost) && start.Before(curLeadElected) { + return ErrTimeoutDueToLeaderLost + } + return ErrTimeout + default: + return err + } +}