clientv3: remove cancelWhenStop from lease implementation

Only have Close() cancel out outstanding goroutines. Canceling out
single-shot RPCs will mask connection close on client.Close().
This commit is contained in:
Anthony Romano 2017-02-06 17:21:46 -08:00
parent 6fb99a8585
commit a213b3abf5

View File

@ -158,13 +158,9 @@ func NewLease(c *Client) Lease {
} }
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
cctx, cancel := context.WithCancel(ctx)
done := cancelWhenStop(cancel, l.stopCtx.Done())
defer close(done)
for { for {
r := &pb.LeaseGrantRequest{TTL: ttl} r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(cctx, r) resp, err := l.remote.LeaseGrant(ctx, r)
if err == nil { if err == nil {
gresp := &LeaseGrantResponse{ gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
@ -174,20 +170,16 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
} }
return gresp, nil return gresp, nil
} }
if isHaltErr(cctx, err) { if isHaltErr(ctx, err) {
return nil, toErr(cctx, err) return nil, toErr(ctx, err)
} }
} }
} }
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
cctx, cancel := context.WithCancel(ctx)
done := cancelWhenStop(cancel, l.stopCtx.Done())
defer close(done)
for { for {
r := &pb.LeaseRevokeRequest{ID: int64(id)} r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(cctx, r) resp, err := l.remote.LeaseRevoke(ctx, r)
if err == nil { if err == nil {
return (*LeaseRevokeResponse)(resp), nil return (*LeaseRevokeResponse)(resp), nil
@ -199,13 +191,9 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
} }
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
cctx, cancel := context.WithCancel(ctx)
done := cancelWhenStop(cancel, l.stopCtx.Done())
defer close(done)
for { for {
r := toLeaseTimeToLiveRequest(id, opts...) r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false)) resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
gresp := &LeaseTimeToLiveResponse{ gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
@ -216,8 +204,8 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
} }
return gresp, nil return gresp, nil
} }
if isHaltErr(cctx, err) { if isHaltErr(ctx, err) {
return nil, toErr(cctx, err) return nil, toErr(ctx, err)
} }
} }
} }
@ -259,12 +247,8 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
} }
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
cctx, cancel := context.WithCancel(ctx)
done := cancelWhenStop(cancel, l.stopCtx.Done())
defer close(done)
for { for {
resp, err := l.keepAliveOnce(cctx, id) resp, err := l.keepAliveOnce(ctx, id)
if err == nil { if err == nil {
if resp.TTL == 0 { if resp.TTL == 0 {
err = rpctypes.ErrLeaseNotFound err = rpctypes.ErrLeaseNotFound
@ -489,20 +473,3 @@ func (ka *keepAlive) Close() {
close(ch) close(ch)
} }
} }
// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
// should be closed when the work is finished. When done fires, cancelWhenStop will release
// its internal resource.
func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
done := make(chan struct{}, 1)
go func() {
select {
case <-stopc:
case <-done:
}
cancel()
}()
return done
}