From e534532523ac0f1d1d68ce0c2397ae2bec1f6121 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 8 Jun 2016 15:01:09 -0700 Subject: [PATCH] clientv3: close keep alive channel if no response within TTL --- clientv3/lease.go | 61 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index 83763cae3..30552fb36 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -44,6 +44,9 @@ type LeaseKeepAliveResponse struct { } const ( + // defaultTTL is the assumed lease TTL used for the first keepalive + // deadline before the actual TTL is known to the client. + defaultTTL = 5 * time.Second // a small buffer to store unsent lease responses. leaseResponseChSize = 16 // NoLease is a lease ID for the absence of a lease. @@ -84,26 +87,38 @@ type lessor struct { stopCancel context.CancelFunc keepAlives map[LeaseID]*keepAlive + + // firstKeepAliveTimeout is the timeout for the first keepalive request + // before the actual TTL is known to the lease client + firstKeepAliveTimeout time.Duration } // keepAlive multiplexes a keepalive for a lease over multiple channels type keepAlive struct { chs []chan<- *LeaseKeepAliveResponse ctxs []context.Context - // deadline is the next time to send a keep alive message + // deadline is the time the keep alive channels close if no response deadline time.Time + // nextKeepAlive is when to send the next keep alive message + nextKeepAlive time.Time // donec is closed on lease revoke, expiration, or cancel. donec chan struct{} } func NewLease(c *Client) Lease { l := &lessor{ - donec: make(chan struct{}), - keepAlives: make(map[LeaseID]*keepAlive), - remote: pb.NewLeaseClient(c.conn), + donec: make(chan struct{}), + keepAlives: make(map[LeaseID]*keepAlive), + remote: pb.NewLeaseClient(c.conn), + firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second, } + if l.firstKeepAliveTimeout == time.Second { + l.firstKeepAliveTimeout = defaultTTL + } + l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) go l.recvKeepAliveLoop() + go l.deadlineLoop() return l } @@ -162,10 +177,11 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl if !ok { // create fresh keep alive ka = &keepAlive{ - chs: []chan<- *LeaseKeepAliveResponse{ch}, - ctxs: []context.Context{ctx}, - deadline: time.Now(), - donec: make(chan struct{}), + chs: []chan<- *LeaseKeepAliveResponse{ch}, + ctxs: []context.Context{ctx}, + deadline: time.Now().Add(l.firstKeepAliveTimeout), + nextKeepAlive: time.Now(), + donec: make(chan struct{}), } l.keepAlives[id] = ka } else { @@ -327,16 +343,39 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { } // send update to all channels - nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second) + nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second) + ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second) for _, ch := range ka.chs { select { case ch <- karesp: - ka.deadline = nextDeadline + ka.nextKeepAlive = nextKeepAlive default: } } } +// deadlineLoop reaps any keep alive channels that have not recieved a resposne within +// the lease TTL +func (l *lessor) deadlineLoop() { + for { + select { + case <-time.After(time.Second): + case <-l.donec: + return + } + now := time.Now() + l.mu.Lock() + for id, ka := range l.keepAlives { + if ka.deadline.Before(now) { + // waited too long for response; lease may be expired + ka.Close() + delete(l.keepAlives, id) + } + } + l.mu.Unlock() + } +} + // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { for { @@ -355,7 +394,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { now := time.Now() l.mu.Lock() for id, ka := range l.keepAlives { - if ka.deadline.Before(now) { + if ka.nextKeepAlive.Before(now) { tosend = append(tosend, id) } }