clientv3: support context cancellation on lease keep alives

This commit is contained in:
Anthony Romano 2016-02-20 02:11:30 -08:00
parent 5dfcdae521
commit 50ad181477

View File

@ -71,8 +71,17 @@ type lessor struct {
stopCtx context.Context stopCtx context.Context
stopCancel context.CancelFunc stopCancel context.CancelFunc
keepAlives map[lease.LeaseID]chan *LeaseKeepAliveResponse keepAlives map[lease.LeaseID]*keepAlive
deadlines map[lease.LeaseID]time.Time }
// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
chs []chan<- *LeaseKeepAliveResponse
ctxs []context.Context
// deadline is the next time to send a keep alive message
deadline time.Time
// donec is closed on lease revoke, expiration, or cancel.
donec chan struct{}
} }
func NewLease(c *Client) Lease { func NewLease(c *Client) Lease {
@ -81,8 +90,7 @@ func NewLease(c *Client) Lease {
conn: c.ActiveConnection(), conn: c.ActiveConnection(),
donec: make(chan struct{}), donec: make(chan struct{}),
keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse), keepAlives: make(map[lease.LeaseID]*keepAlive),
deadlines: make(map[lease.LeaseID]time.Time),
} }
l.remote = pb.NewLeaseClient(l.conn) l.remote = pb.NewLeaseClient(l.conn)
@ -138,26 +146,29 @@ func (l *lessor) Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResp
} }
func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error) { func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
lc := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
// todo: add concellation based on the passed in ctx
l.mu.Lock() l.mu.Lock()
_, ok := l.keepAlives[id] ka, ok := l.keepAlives[id]
if !ok { if !ok {
l.keepAlives[id] = lc // create fresh keep alive
l.deadlines[id] = time.Now() ka = &keepAlive{
l.mu.Unlock() chs: []chan<- *LeaseKeepAliveResponse{ch},
return lc, nil ctxs: []context.Context{ctx},
deadline: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
} else {
// add channel and context to existing keep alive
ka.ctxs = append(ka.ctxs, ctx)
ka.chs = append(ka.chs, ch)
} }
l.mu.Unlock() l.mu.Unlock()
resp, err := l.KeepAliveOnce(ctx, id) go l.keepAliveCtxCloser(id, ctx, ka.donec)
if err != nil {
return nil, err return ch, nil
}
lc <- resp
return lc, nil
} }
func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) { func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
@ -184,6 +195,38 @@ func (l *lessor) Close() error {
return nil return nil
} }
func (l *lessor) keepAliveCtxCloser(id lease.LeaseID, ctx context.Context, donec <-chan struct{}) {
select {
case <-donec:
return
case <-l.donec:
return
case <-ctx.Done():
}
l.mu.Lock()
defer l.mu.Unlock()
ka, ok := l.keepAlives[id]
if !ok {
return
}
// close channel and remove context if still associated with keep alive
for i, c := range ka.ctxs {
if c == ctx {
close(ka.chs[i])
ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
break
}
}
// remove if no one more listeners
if len(ka.chs) == 0 {
delete(l.keepAlives, id)
}
}
func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) { func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
stream, err := l.getRemote().LeaseKeepAlive(ctx) stream, err := l.getRemote().LeaseKeepAlive(ctx)
if err != nil { if err != nil {
@ -205,10 +248,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
func (l *lessor) recvKeepAliveLoop() { func (l *lessor) recvKeepAliveLoop() {
defer func() { defer func() {
l.stopCancel() l.stopCancel()
l.mu.Lock()
close(l.donec) close(l.donec)
for _, ch := range l.keepAlives { for _, ka := range l.keepAlives {
close(ch) ka.Close()
} }
l.keepAlives = make(map[lease.LeaseID]*keepAlive)
l.mu.Unlock()
}() }()
stream, serr := l.resetRecv() stream, serr := l.resetRecv()
@ -239,26 +285,31 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
id := lease.LeaseID(resp.ID)
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
lch, ok := l.keepAlives[lease.LeaseID(resp.ID)] ka, ok := l.keepAlives[id]
if !ok { if !ok {
return return
} }
if resp.TTL <= 0 { if resp.TTL <= 0 {
close(lch) // lease expired; close all keep alive channels
delete(l.deadlines, lease.LeaseID(resp.ID)) delete(l.keepAlives, id)
delete(l.keepAlives, lease.LeaseID(resp.ID)) ka.Close()
return return
} }
select { // send update to all channels
case lch <- (*LeaseKeepAliveResponse)(resp): nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
l.deadlines[lease.LeaseID(resp.ID)] = for _, ch := range ka.chs {
time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) select {
default: case ch <- (*LeaseKeepAliveResponse)(resp):
ka.deadline = nextDeadline
default:
}
} }
} }
@ -277,8 +328,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
now := time.Now() now := time.Now()
l.mu.Lock() l.mu.Lock()
for id, d := range l.deadlines { for id, ka := range l.keepAlives {
if d.Before(now) { if ka.deadline.Before(now) {
tosend = append(tosend, id) tosend = append(tosend, id)
} }
} }
@ -359,6 +410,13 @@ func (l *lessor) newStream() error {
return nil return nil
} }
func (ka *keepAlive) Close() {
close(ka.donec)
for _, ch := range ka.chs {
close(ch)
}
}
// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
// should be closed when the work is finished. When done fires, cancelWhenStop will release // should be closed when the work is finished. When done fires, cancelWhenStop will release
// its internal resource. // its internal resource.