diff --git a/clientv3/lease.go b/clientv3/lease.go index 53f779fd2..2bd660558 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -71,8 +71,17 @@ type lessor struct { stopCtx context.Context stopCancel context.CancelFunc - keepAlives map[lease.LeaseID]chan *LeaseKeepAliveResponse - deadlines map[lease.LeaseID]time.Time + keepAlives map[lease.LeaseID]*keepAlive +} + +// keepAlive multiplexes a keepalive for a lease over multiple channels +type keepAlive struct { + chs []chan<- *LeaseKeepAliveResponse + ctxs []context.Context + // deadline is the next time to send a keep alive message + deadline time.Time + // donec is closed on lease revoke, expiration, or cancel. + donec chan struct{} } func NewLease(c *Client) Lease { @@ -81,8 +90,7 @@ func NewLease(c *Client) Lease { conn: c.ActiveConnection(), donec: make(chan struct{}), - keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse), - deadlines: make(map[lease.LeaseID]time.Time), + keepAlives: make(map[lease.LeaseID]*keepAlive), } l.remote = pb.NewLeaseClient(l.conn) @@ -138,26 +146,29 @@ func (l *lessor) Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResp } func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error) { - lc := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) - - // todo: add concellation based on the passed in ctx + ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) l.mu.Lock() - _, ok := l.keepAlives[id] + ka, ok := l.keepAlives[id] if !ok { - l.keepAlives[id] = lc - l.deadlines[id] = time.Now() - l.mu.Unlock() - return lc, nil + // create fresh keep alive + ka = &keepAlive{ + chs: []chan<- *LeaseKeepAliveResponse{ch}, + ctxs: []context.Context{ctx}, + deadline: time.Now(), + donec: make(chan struct{}), + } + l.keepAlives[id] = ka + } else { + // add channel and context to existing keep alive + ka.ctxs = append(ka.ctxs, ctx) + ka.chs = append(ka.chs, ch) } l.mu.Unlock() - resp, err := l.KeepAliveOnce(ctx, id) - if err != nil { - return nil, err - } - lc <- resp - return lc, nil + go l.keepAliveCtxCloser(id, ctx, ka.donec) + + return ch, nil } func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) { @@ -184,6 +195,38 @@ func (l *lessor) Close() error { return nil } +func (l *lessor) keepAliveCtxCloser(id lease.LeaseID, ctx context.Context, donec <-chan struct{}) { + select { + case <-donec: + return + case <-l.donec: + return + case <-ctx.Done(): + } + + l.mu.Lock() + defer l.mu.Unlock() + + ka, ok := l.keepAlives[id] + if !ok { + return + } + + // close channel and remove context if still associated with keep alive + for i, c := range ka.ctxs { + if c == ctx { + close(ka.chs[i]) + ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...) + ka.chs = append(ka.chs[:i], ka.chs[i+1:]...) + break + } + } + // remove if no one more listeners + if len(ka.chs) == 0 { + delete(l.keepAlives, id) + } +} + func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) { stream, err := l.getRemote().LeaseKeepAlive(ctx) if err != nil { @@ -205,10 +248,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee func (l *lessor) recvKeepAliveLoop() { defer func() { l.stopCancel() + l.mu.Lock() close(l.donec) - for _, ch := range l.keepAlives { - close(ch) + for _, ka := range l.keepAlives { + ka.Close() } + l.keepAlives = make(map[lease.LeaseID]*keepAlive) + l.mu.Unlock() }() stream, serr := l.resetRecv() @@ -239,26 +285,31 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { + id := lease.LeaseID(resp.ID) + l.mu.Lock() defer l.mu.Unlock() - lch, ok := l.keepAlives[lease.LeaseID(resp.ID)] + ka, ok := l.keepAlives[id] if !ok { return } if resp.TTL <= 0 { - close(lch) - delete(l.deadlines, lease.LeaseID(resp.ID)) - delete(l.keepAlives, lease.LeaseID(resp.ID)) + // lease expired; close all keep alive channels + delete(l.keepAlives, id) + ka.Close() return } - select { - case lch <- (*LeaseKeepAliveResponse)(resp): - l.deadlines[lease.LeaseID(resp.ID)] = - time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) - default: + // send update to all channels + nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) + for _, ch := range ka.chs { + select { + case ch <- (*LeaseKeepAliveResponse)(resp): + ka.deadline = nextDeadline + default: + } } } @@ -277,8 +328,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { now := time.Now() l.mu.Lock() - for id, d := range l.deadlines { - if d.Before(now) { + for id, ka := range l.keepAlives { + if ka.deadline.Before(now) { tosend = append(tosend, id) } } @@ -359,6 +410,13 @@ func (l *lessor) newStream() error { return nil } +func (ka *keepAlive) Close() { + close(ka.donec) + for _, ch := range ka.chs { + close(ch) + } +} + // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done // should be closed when the work is finished. When done fires, cancelWhenStop will release // its internal resource.