clientv3: better error message for keep alive loop halt

This commit is contained in:
Denys Smirnov 2016-12-15 16:06:27 +02:00
parent 5183ce0118
commit b126e31132
2 changed files with 26 additions and 9 deletions

View File

@ -569,7 +569,8 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
} }
cli.Lease.Close() cli.Lease.Close()
if _, err := cli.KeepAlive(ctx, resp.ID); err != clientv3.ErrLeaseHalted { _, err = cli.KeepAlive(ctx, resp.ID)
t.Fatalf("expected %v, got %v", clientv3.ErrLeaseHalted, err) if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
} }
} }

View File

@ -15,7 +15,6 @@
package clientv3 package clientv3
import ( import (
"errors"
"sync" "sync"
"time" "time"
@ -70,7 +69,20 @@ const (
NoLease LeaseID = 0 NoLease LeaseID = 0
) )
var ErrLeaseHalted = errors.New("etcdclient: leases halted") // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
type ErrKeepAliveHalted struct {
Reason error
}
func (e ErrKeepAliveHalted) Error() string {
s := "etcdclient: leases keep alive halted"
if e.Reason != nil {
s += ": " + e.Reason.Error()
}
return s
}
type Lease interface { type Lease interface {
// Grant creates a new lease. // Grant creates a new lease.
@ -97,8 +109,9 @@ type Lease interface {
type lessor struct { type lessor struct {
mu sync.Mutex // guards all fields mu sync.Mutex // guards all fields
// donec is closed when recvKeepAliveLoop stops // donec is closed and loopErr is set when recvKeepAliveLoop stops
donec chan struct{} donec chan struct{}
loopErr error
remote pb.LeaseClient remote pb.LeaseClient
@ -222,9 +235,10 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
// ensure that recvKeepAliveLoop is still running // ensure that recvKeepAliveLoop is still running
select { select {
case <-l.donec: case <-l.donec:
err := l.loopErr
l.mu.Unlock() l.mu.Unlock()
close(ch) close(ch)
return ch, ErrLeaseHalted return ch, ErrKeepAliveHalted{Reason: err}
default: default:
} }
ka, ok := l.keepAlives[id] ka, ok := l.keepAlives[id]
@ -338,10 +352,11 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return karesp, nil return karesp, nil
} }
func (l *lessor) recvKeepAliveLoop() { func (l *lessor) recvKeepAliveLoop() (gerr error) {
defer func() { defer func() {
l.mu.Lock() l.mu.Lock()
close(l.donec) close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives { for _, ka := range l.keepAlives {
ka.Close() ka.Close()
} }
@ -354,13 +369,14 @@ func (l *lessor) recvKeepAliveLoop() {
resp, err := stream.Recv() resp, err := stream.Recv()
if err != nil { if err != nil {
if isHaltErr(l.stopCtx, err) { if isHaltErr(l.stopCtx, err) {
return return err
} }
stream, serr = l.resetRecv() stream, serr = l.resetRecv()
continue continue
} }
l.recvKeepAlive(resp) l.recvKeepAlive(resp)
} }
return serr
} }
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests