From 9b1fe45853cefb6372ad220322e043560fcb5684 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 13 Sep 2016 14:31:50 -0700 Subject: [PATCH] concurrency: use create max revision for locks and elections --- clientv3/concurrency/election.go | 2 +- clientv3/concurrency/key.go | 37 +++++++++++--------------------- clientv3/concurrency/mutex.go | 2 +- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index abf647aa6..e1728402c 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error { } } - err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1)) + err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1) if err != nil { // clean up in case of context cancel select { diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index 56172bd08..a44fbfda5 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -16,7 +16,6 @@ package concurrency import ( "fmt" - "math" v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" @@ -39,39 +38,27 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e if err := wr.Err(); err != nil { return err } - if err := ctx.Err(); err != nil { return err } return fmt.Errorf("lost watcher waiting for delete") } -// waitDeletes efficiently waits until all keys matched by Get(key, opts...) are deleted -func waitDeletes(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { - getOpts := []v3.OpOption{v3.WithSort(v3.SortByCreateRevision, v3.SortAscend)} - getOpts = append(getOpts, opts...) - resp, err := client.Get(ctx, key, getOpts...) - maxRev := int64(math.MaxInt64) - getOpts = append(getOpts, v3.WithRev(0)) - for err == nil { - for len(resp.Kvs) > 0 { - i := len(resp.Kvs) - 1 - if resp.Kvs[i].CreateRevision <= maxRev { - break - } - resp.Kvs = resp.Kvs[:i] +// waitDeletes efficiently waits until all keys matching the prefix and no greater +// than the create revision. +func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error { + getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev)) + for { + resp, err := client.Get(ctx, pfx, getOpts...) + if err != nil { + return err } if len(resp.Kvs) == 0 { - break + return nil } - lastKV := resp.Kvs[len(resp.Kvs)-1] - maxRev = lastKV.CreateRevision - err = waitDelete(ctx, client, string(lastKV.Key), maxRev) - if err != nil || len(resp.Kvs) == 1 { - break + lastKey := string(resp.Kvs[0].Key) + if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil { + return err } - getOpts = append(getOpts, v3.WithLimit(int64(len(resp.Kvs)-1))) - resp, err = client.Get(ctx, key, getOpts...) } - return err } diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 39010e47b..1e9e36a35 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -57,7 +57,7 @@ func (m *Mutex) Lock(ctx context.Context) error { } // wait for deletion revisions prior to myKey - err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1)) + err = waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if cancelled select { case <-ctx.Done():