From 5e3b20e70c44fffda981c49446aaba433e13379c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 12 Jan 2017 15:53:47 -0800 Subject: [PATCH] clientv3: don't reset stream on keepaliveonce or revoke failure Would cause the keepalive loop to cancel out. Fixes #7082 --- clientv3/lease.go | 50 ++++++++++++++--------------------------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index b1e155919..f5937fd4d 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -195,9 +195,6 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, if isHaltErr(ctx, err) { return nil, toErr(ctx, err) } - if nerr := l.newStream(); nerr != nil { - return nil, nerr - } } } @@ -277,10 +274,6 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive if isHaltErr(ctx, err) { return nil, toErr(ctx, err) } - - if nerr := l.newStream(); nerr != nil { - return nil, nerr - } } } @@ -378,10 +371,23 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { - if err := l.newStream(); err != nil { + sctx, cancel := context.WithCancel(l.stopCtx) + stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) + if err = toErr(sctx, err); err != nil { + cancel() return nil, err } - stream := l.getKeepAliveStream() + + l.mu.Lock() + defer l.mu.Unlock() + if l.stream != nil && l.streamCancel != nil { + l.stream.CloseSend() + l.streamCancel() + } + + l.streamCancel = cancel + l.stream = stream + go l.sendKeepAliveLoop(stream) return stream, nil } @@ -477,32 +483,6 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } } -func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { - l.mu.Lock() - defer l.mu.Unlock() - return l.stream -} - -func (l *lessor) newStream() error { - sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) - if err != nil { - cancel() - return toErr(sctx, err) - } - - l.mu.Lock() - defer l.mu.Unlock() - if l.stream != nil && l.streamCancel != nil { - l.stream.CloseSend() - l.streamCancel() - } - - l.streamCancel = cancel - l.stream = stream - return nil -} - func (ka *keepAlive) Close() { close(ka.donec) for _, ch := range ka.chs {