From 16420cfe63e16ae6e0e1bd9c65ce1119b8a25334 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 18 Feb 2016 16:16:13 -0800 Subject: [PATCH] contrib/recipes: use clientv3 lease API --- contrib/recipes/lease.go | 54 ++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/contrib/recipes/lease.go b/contrib/recipes/lease.go index 20ff2c598..327d12fb7 100644 --- a/contrib/recipes/lease.go +++ b/contrib/recipes/lease.go @@ -15,11 +15,9 @@ package recipe import ( "sync" - "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" ) @@ -32,8 +30,9 @@ type clientLeaseMgr struct { } type leaseKeepAlive struct { - id lease.LeaseID - donec chan struct{} + id lease.LeaseID + cancel context.CancelFunc + donec <-chan struct{} } func SessionLease(client *clientv3.Client) (lease.LeaseID, error) { @@ -49,13 +48,10 @@ func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error) // would fail) or if transferring lease ownership. func StopSessionLease(client *clientv3.Client) { clientLeases.mu.Lock() - lka, ok := clientLeases.leases[client] - if ok { - delete(clientLeases.leases, client) - } + lka := clientLeases.leases[client] clientLeases.mu.Unlock() if lka != nil { - lka.donec <- struct{}{} + lka.cancel() <-lka.donec } } @@ -67,8 +63,7 @@ func RevokeSessionLease(client *clientv3.Client) (err error) { clientLeases.mu.Unlock() StopSessionLease(client) if lka != nil { - req := &pb.LeaseRevokeRequest{ID: int64(lka.id)} - _, err = client.Lease.LeaseRevoke(context.TODO(), req) + _, err = clientv3.NewLease(client).Revoke(context.TODO(), lka.id) } return err } @@ -80,48 +75,37 @@ func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lea return lka.id, nil } - resp, err := client.Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: ttl}) + lc := clientv3.NewLease(client) + resp, err := lc.Create(context.TODO(), ttl) if err != nil { return lease.NoLease, err } id := lease.LeaseID(resp.ID) ctx, cancel := context.WithCancel(context.Background()) - keepAlive, err := client.Lease.LeaseKeepAlive(ctx) + keepAlive, err := lc.KeepAlive(ctx, id) if err != nil || keepAlive == nil { return lease.NoLease, err } - lka := &leaseKeepAlive{id: id, donec: make(chan struct{})} + donec := make(chan struct{}) + lka := &leaseKeepAlive{ + id: id, + cancel: cancel, + donec: donec} clm.leases[client] = lka - // keep the lease alive until client error + // keep the lease alive until client error or cancelled context go func() { defer func() { - keepAlive.CloseSend() clm.mu.Lock() delete(clm.leases, client) clm.mu.Unlock() - cancel() - close(lka.donec) + lc.Close() + close(donec) }() - - ttl := resp.TTL - for { - lreq := &pb.LeaseKeepAliveRequest{ID: int64(id)} - select { - case <-lka.donec: - return - case <-time.After(time.Duration(ttl/2) * time.Second): - } - if err := keepAlive.Send(lreq); err != nil { - break - } - resp, err := keepAlive.Recv() - if err != nil { - break - } - ttl = resp.TTL + for range keepAlive { + // eat messages until keep alive channel closes } }()