mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: don't reset stream on keepaliveonce or revoke failure
Would cause the keepalive loop to cancel out. Fixes #7082
This commit is contained in:
parent
824277cb3a
commit
5e3b20e70c
@ -195,9 +195,6 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
|
|||||||
if isHaltErr(ctx, err) {
|
if isHaltErr(ctx, err) {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
if nerr := l.newStream(); nerr != nil {
|
|
||||||
return nil, nerr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,10 +274,6 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
|||||||
if isHaltErr(ctx, err) {
|
if isHaltErr(ctx, err) {
|
||||||
return nil, toErr(ctx, err)
|
return nil, toErr(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if nerr := l.newStream(); nerr != nil {
|
|
||||||
return nil, nerr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -378,10 +371,23 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
|||||||
|
|
||||||
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
|
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
|
||||||
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
||||||
if err := l.newStream(); err != nil {
|
sctx, cancel := context.WithCancel(l.stopCtx)
|
||||||
|
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
|
||||||
|
if err = toErr(sctx, err); err != nil {
|
||||||
|
cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stream := l.getKeepAliveStream()
|
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
if l.stream != nil && l.streamCancel != nil {
|
||||||
|
l.stream.CloseSend()
|
||||||
|
l.streamCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
l.streamCancel = cancel
|
||||||
|
l.stream = stream
|
||||||
|
|
||||||
go l.sendKeepAliveLoop(stream)
|
go l.sendKeepAliveLoop(stream)
|
||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
@ -477,32 +483,6 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
return l.stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lessor) newStream() error {
|
|
||||||
sctx, cancel := context.WithCancel(l.stopCtx)
|
|
||||||
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
|
|
||||||
if err != nil {
|
|
||||||
cancel()
|
|
||||||
return toErr(sctx, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
if l.stream != nil && l.streamCancel != nil {
|
|
||||||
l.stream.CloseSend()
|
|
||||||
l.streamCancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
l.streamCancel = cancel
|
|
||||||
l.stream = stream
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ka *keepAlive) Close() {
|
func (ka *keepAlive) Close() {
|
||||||
close(ka.donec)
|
close(ka.donec)
|
||||||
for _, ch := range ka.chs {
|
for _, ch := range ka.chs {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user