etcdserver: specify timeout caused by leader election

Before this PR, the timeout caused by leader election returns:

```
14:45:37 etcd2 | 2015-08-12 14:45:37.786349 E | etcdhttp: got unexpected
response error (etcdserver: request timed out)
```

After this PR:

```
15:52:54 etcd1 | 2015-08-12 15:52:54.389523 E | etcdhttp: etcdserver:
request timed out, possibly due to leader down
```

Conflicts:
	etcdserver/raft.go
This commit is contained in:
Yicheng Qin 2015-08-12 13:38:43 -07:00
parent d2ecd9cecf
commit 084936a920
5 changed files with 52 additions and 24 deletions

View File

@ -18,32 +18,20 @@ import (
"errors" "errors"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
) )
var ( var (
ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped") ErrStopped = errors.New("etcdserver: server stopped")
ErrIDRemoved = errors.New("etcdserver: ID removed") ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists") ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found") ErrIDNotFound = errors.New("etcdserver: ID not found")
ErrPeerURLexists = errors.New("etcdserver: peerURL exists") ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
ErrCanceled = errors.New("etcdserver: request cancelled") ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out") ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderLost = errors.New("etcdserver: request timed out, possibly due to leader lost")
) )
func parseCtxErr(err error) error {
switch err {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
return ErrTimeout
default:
return err
}
}
func isKeyNotFound(err error) bool { func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error) e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound

View File

@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
) )
@ -59,7 +60,11 @@ func writeError(w http.ResponseWriter, err error) {
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error()) herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
herr.WriteTo(w) herr.WriteTo(w)
default: default:
plog.Errorf("got unexpected response error (%v)", err) if err == etcdserver.ErrTimeoutDueToLeaderLost {
plog.Error(err)
} else {
plog.Errorf("got unexpected response error (%v)", err)
}
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error") herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
herr.WriteTo(w) herr.WriteTo(w)
} }

View File

@ -19,6 +19,7 @@ import (
"expvar" "expvar"
"os" "os"
"sort" "sort"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -86,6 +87,10 @@ type raftNode struct {
term uint64 term uint64
lead uint64 lead uint64
mu sync.Mutex
// last lead elected time
lt time.Time
raft.Node raft.Node
// a chan to send out apply // a chan to send out apply
@ -120,6 +125,11 @@ func (r *raftNode) run() {
r.Tick() r.Tick()
case rd := <-r.Ready(): case rd := <-r.Ready():
if rd.SoftState != nil { if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
}
atomic.StoreUint64(&r.lead, rd.SoftState.Lead) atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader { if rd.RaftState == raft.StateLeader {
syncC = r.s.SyncTicker syncC = r.s.SyncTicker
@ -177,6 +187,12 @@ func (r *raftNode) apply() chan apply {
return r.applyc return r.applyc
} }
func (r *raftNode) leadElectedTime() time.Time {
r.mu.Lock()
defer r.mu.Unlock()
return r.lt
}
func (r *raftNode) stop() { func (r *raftNode) stop() {
r.Stop() r.Stop()
r.transport.Stop() r.transport.Stop()

View File

@ -551,7 +551,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
case <-ctx.Done(): case <-ctx.Done():
proposeFailed.Inc() proposeFailed.Inc()
s.w.Trigger(r.ID, nil) // GC wait s.w.Trigger(r.ID, nil) // GC wait
return Response{}, parseCtxErr(ctx.Err()) return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
case <-s.done: case <-s.done:
return Response{}, ErrStopped return Response{}, ErrStopped
} }
@ -646,6 +646,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
cc.ID = s.reqIDGen.Next() cc.ID = s.reqIDGen.Next()
ch := s.w.Register(cc.ID) ch := s.w.Register(cc.ID)
start := time.Now()
if err := s.r.ProposeConfChange(ctx, cc); err != nil { if err := s.r.ProposeConfChange(ctx, cc); err != nil {
s.w.Trigger(cc.ID, nil) s.w.Trigger(cc.ID, nil)
return err return err
@ -661,7 +662,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
s.w.Trigger(cc.ID, nil) // GC wait s.w.Trigger(cc.ID, nil) // GC wait
return parseCtxErr(ctx.Err()) return s.parseProposeCtxErr(ctx.Err(), start)
case <-s.done: case <-s.done:
return ErrStopped return ErrStopped
} }
@ -1002,3 +1003,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
plog.Errorf("error updating cluster version (%v)", err) plog.Errorf("error updating cluster version (%v)", err)
} }
} }
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
switch err {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderLost
}
return ErrTimeout
default:
return err
}
}

View File

@ -566,6 +566,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) { func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: &nodeRecorder{}}, r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{}, w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1023,6 +1024,7 @@ func TestPublishStopped(t *testing.T) {
func TestPublishRetry(t *testing.T) { func TestPublishRetry(t *testing.T) {
n := &nodeRecorder{} n := &nodeRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n}, r: raftNode{Node: n},
w: &waitRecorder{}, w: &waitRecorder{},
done: make(chan struct{}), done: make(chan struct{}),