concurrency: support returning response header for mutex

This commit is contained in:
Anthony Romano 2017-03-07 13:11:05 -08:00
parent a2cdd908dc
commit d51c8bb640
3 changed files with 16 additions and 9 deletions

View File

@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
}
err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {

View File

@ -18,6 +18,7 @@ import (
"fmt"
v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
@ -46,19 +47,19 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error {
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return err
return nil, err
}
if len(resp.Kvs) == 0 {
return nil
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return err
return nil, err
}
}
}

View File

@ -19,6 +19,7 @@ import (
"sync"
v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
)
@ -29,13 +30,14 @@ type Mutex struct {
pfx string
myKey string
myRev int64
hdr *pb.ResponseHeader
}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1}
return &Mutex{s, pfx + "/", "", -1, nil}
}
// Lock locks the mutex with a cancellable context. If the context is cancelled
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
@ -57,14 +59,15 @@ func (m *Mutex) Lock(ctx context.Context) error {
}
// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock(client.Ctx())
default:
m.hdr = hdr
}
return err
return werr
}
func (m *Mutex) Unlock(ctx context.Context) error {
@ -83,6 +86,9 @@ func (m *Mutex) IsOwner() v3.Cmp {
func (m *Mutex) Key() string { return m.myKey }
// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
type lockerMutex struct{ *Mutex }
func (lm *lockerMutex) Lock() {