From a213b3abf5b23e5ce56e972893cce4dda0769872 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 6 Feb 2017 17:21:46 -0800 Subject: [PATCH] clientv3: remove cancelWhenStop from lease implementation Only have Close() cancel out outstanding goroutines. Canceling out single-shot RPCs will mask connection close on client.Close(). --- clientv3/lease.go | 49 ++++++++--------------------------------------- 1 file changed, 8 insertions(+), 41 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index 90802a5cb..10374e20c 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -158,13 +158,9 @@ func NewLease(c *Client) Lease { } func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { - cctx, cancel := context.WithCancel(ctx) - done := cancelWhenStop(cancel, l.stopCtx.Done()) - defer close(done) - for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(cctx, r) + resp, err := l.remote.LeaseGrant(ctx, r) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -174,20 +170,16 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err } return gresp, nil } - if isHaltErr(cctx, err) { - return nil, toErr(cctx, err) + if isHaltErr(ctx, err) { + return nil, toErr(ctx, err) } } } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { - cctx, cancel := context.WithCancel(ctx) - done := cancelWhenStop(cancel, l.stopCtx.Done()) - defer close(done) - for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(cctx, r) + resp, err := l.remote.LeaseRevoke(ctx, r) if err == nil { return (*LeaseRevokeResponse)(resp), nil @@ -199,13 +191,9 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { - cctx, cancel := context.WithCancel(ctx) - done := cancelWhenStop(cancel, l.stopCtx.Done()) - defer close(done) - for { r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false)) + resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(), @@ -216,8 +204,8 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption } return gresp, nil } - if isHaltErr(cctx, err) { - return nil, toErr(cctx, err) + if isHaltErr(ctx, err) { + return nil, toErr(ctx, err) } } } @@ -259,12 +247,8 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl } func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { - cctx, cancel := context.WithCancel(ctx) - done := cancelWhenStop(cancel, l.stopCtx.Done()) - defer close(done) - for { - resp, err := l.keepAliveOnce(cctx, id) + resp, err := l.keepAliveOnce(ctx, id) if err == nil { if resp.TTL == 0 { err = rpctypes.ErrLeaseNotFound @@ -489,20 +473,3 @@ func (ka *keepAlive) Close() { close(ch) } } - -// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done -// should be closed when the work is finished. When done fires, cancelWhenStop will release -// its internal resource. -func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} { - done := make(chan struct{}, 1) - - go func() { - select { - case <-stopc: - case <-done: - } - cancel() - }() - - return done -}