From 8024a0d15f3a2dd17677c5aa1de6f96a8bec8f9f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 29 Mar 2017 22:09:04 -0700 Subject: [PATCH] clientv3: support WithRequireLeader in lease client Unconditionally opens a WithRequireLeader stream in the lease client. Any keep alive channels opened using WithRequireLeader will be closed when the leader is lost. Fixes #7275 --- clientv3/lease.go | 67 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index 900769386..a6494ceee 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,6 +22,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type ( @@ -67,6 +68,9 @@ const ( leaseResponseChSize = 16 // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 + + // retryConnWait is how long to wait before retrying on a lost leader + retryConnWait = 500 * time.Millisecond ) // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. @@ -157,7 +161,8 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL } - l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) + reqLeaderCtx := WithRequireLeader(context.Background()) + l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx) return l } @@ -309,6 +314,45 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha } } +// closeRequireLeader scans all keep alives for ctxs that have require leader +// and closes the associated channels. +func (l *lessor) closeRequireLeader() { + l.mu.Lock() + defer l.mu.Unlock() + for _, ka := range l.keepAlives { + reqIdxs := 0 + // find all required leader channels, close, mark as nil + for i, ctx := range ka.ctxs { + md, ok := metadata.FromContext(ctx) + if !ok { + continue + } + ks := md[rpctypes.MetadataRequireLeaderKey] + if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader { + continue + } + close(ka.chs[i]) + ka.chs[i] = nil + reqIdxs++ + } + if reqIdxs == 0 { + continue + } + // remove all channels that required a leader from keepalive + newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) + newCtxs := make([]context.Context, len(newChs)) + newIdx := 0 + for i := range ka.chs { + if ka.chs[i] == nil { + continue + } + newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx] + newIdx++ + } + ka.chs, ka.ctxs = newChs, newCtxs + } +} + func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { cctx, cancel := context.WithCancel(ctx) defer cancel() @@ -351,14 +395,22 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { stream, serr := l.resetRecv() for serr == nil { resp, err := stream.Recv() - if err != nil { - if isHaltErr(l.stopCtx, err) { - return err - } - stream, serr = l.resetRecv() + if err == nil { + l.recvKeepAlive(resp) continue } - l.recvKeepAlive(resp) + err = toErr(l.stopCtx, err) + if err == rpctypes.ErrNoLeader { + l.closeRequireLeader() + select { + case <-time.After(retryConnWait): + case <-l.stopCtx.Done(): + return err + } + } else if isHaltErr(l.stopCtx, err) { + return err + } + stream, serr = l.resetRecv() } return serr } @@ -375,7 +427,6 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { l.mu.Lock() defer l.mu.Unlock() if l.stream != nil && l.streamCancel != nil { - l.stream.CloseSend() l.streamCancel() }