From d51c8bb6406a03b4d8401493a79db55a459a5751 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 7 Mar 2017 13:11:05 -0800 Subject: [PATCH] concurrency: support returning response header for mutex --- clientv3/concurrency/election.go | 2 +- clientv3/concurrency/key.go | 9 +++++---- clientv3/concurrency/mutex.go | 14 ++++++++++---- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index e1728402c..f69508e14 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error { } } - err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1) + _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1) if err != nil { // clean up in case of context cancel select { diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index a44fbfda5..cf006d70e 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -18,6 +18,7 @@ import ( "fmt" v3 "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" ) @@ -46,19 +47,19 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e // waitDeletes efficiently waits until all keys matching the prefix and no greater // than the create revision. -func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error { +func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) { getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev)) for { resp, err := client.Get(ctx, pfx, getOpts...) if err != nil { - return err + return nil, err } if len(resp.Kvs) == 0 { - return nil + return resp.Header, nil } lastKey := string(resp.Kvs[0].Key) if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil { - return err + return nil, err } } } diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 1e9e36a35..cee15db7b 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -19,6 +19,7 @@ import ( "sync" v3 "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" ) @@ -29,13 +30,14 @@ type Mutex struct { pfx string myKey string myRev int64 + hdr *pb.ResponseHeader } func NewMutex(s *Session, pfx string) *Mutex { - return &Mutex{s, pfx + "/", "", -1} + return &Mutex{s, pfx + "/", "", -1, nil} } -// Lock locks the mutex with a cancellable context. If the context is cancelled +// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { s := m.s @@ -57,14 +59,15 @@ func (m *Mutex) Lock(ctx context.Context) error { } // wait for deletion revisions prior to myKey - err = waitDeletes(ctx, client, m.pfx, m.myRev-1) + hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if cancelled select { case <-ctx.Done(): m.Unlock(client.Ctx()) default: + m.hdr = hdr } - return err + return werr } func (m *Mutex) Unlock(ctx context.Context) error { @@ -83,6 +86,9 @@ func (m *Mutex) IsOwner() v3.Cmp { func (m *Mutex) Key() string { return m.myKey } +// Header is the response header received from etcd on acquiring the lock. +func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr } + type lockerMutex struct{ *Mutex } func (lm *lockerMutex) Lock() {