Merge pull request #7010 from dennwc/keepalive-exit-err

clientv3: ensure KeepAlive channel is closed or error is returned
This commit is contained in:
Xiang Li
2016-12-15 08:06:36 -08:00
committed by GitHub
2 changed files with 52 additions and 4 deletions

View File

@@ -553,3 +553,24 @@ func TestLeaseRenewLostQuorum(t *testing.T) {
t.Fatalf("timed out waiting for keepalive")
}
}
func TestLeaseKeepAliveLoopExit(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
ctx := context.Background()
cli := clus.Client(0)
resp, err := cli.Grant(ctx, 5)
if err != nil {
t.Fatal(err)
}
cli.Lease.Close()
_, err = cli.KeepAlive(ctx, resp.ID)
if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
}
}

View File

@@ -69,6 +69,21 @@ const (
NoLease LeaseID = 0
)
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
type ErrKeepAliveHalted struct {
Reason error
}
func (e ErrKeepAliveHalted) Error() string {
s := "etcdclient: leases keep alive halted"
if e.Reason != nil {
s += ": " + e.Reason.Error()
}
return s
}
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
@@ -94,8 +109,9 @@ type Lease interface {
type lessor struct {
mu sync.Mutex // guards all fields
// donec is closed when recvKeepAliveLoop stops
donec chan struct{}
// donec is closed and loopErr is set when recvKeepAliveLoop stops
donec chan struct{}
loopErr error
remote pb.LeaseClient
@@ -216,6 +232,15 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
select {
case <-l.donec:
err := l.loopErr
l.mu.Unlock()
close(ch)
return ch, ErrKeepAliveHalted{Reason: err}
default:
}
ka, ok := l.keepAlives[id]
if !ok {
// create fresh keep alive
@@ -327,10 +352,11 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return karesp, nil
}
func (l *lessor) recvKeepAliveLoop() {
func (l *lessor) recvKeepAliveLoop() (gerr error) {
defer func() {
l.mu.Lock()
close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives {
ka.Close()
}
@@ -343,13 +369,14 @@ func (l *lessor) recvKeepAliveLoop() {
resp, err := stream.Recv()
if err != nil {
if isHaltErr(l.stopCtx, err) {
return
return err
}
stream, serr = l.resetRecv()
continue
}
l.recvKeepAlive(resp)
}
return serr
}
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests