From 8f718e2e5ab064c9e703dea74e3f9b06c9339702 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 27 Oct 2016 15:02:58 -0400 Subject: [PATCH] contrib/recipes: unexport and clean up keys.go Fixes #6731 --- contrib/recipes/barrier.go | 2 +- contrib/recipes/double_barrier.go | 2 +- contrib/recipes/key.go | 61 ++++++++++--------------------- contrib/recipes/priority_queue.go | 2 +- contrib/recipes/queue.go | 2 +- contrib/recipes/rwmutex.go | 4 +- 6 files changed, 25 insertions(+), 48 deletions(-) diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index 7f8dd372c..33aedf6c5 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -35,7 +35,7 @@ func NewBarrier(client *v3.Client, key string) *Barrier { // Hold creates the barrier key causing processes to block on Wait. func (b *Barrier) Hold() error { - _, err := NewKey(b.client, b.key, 0) + _, err := newKey(b.client, b.key, 0) return err } diff --git a/contrib/recipes/double_barrier.go b/contrib/recipes/double_barrier.go index e31b649a1..7690ba1d4 100644 --- a/contrib/recipes/double_barrier.go +++ b/contrib/recipes/double_barrier.go @@ -44,7 +44,7 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr // Enter waits for "count" processes to enter the barrier then returns func (b *DoubleBarrier) Enter() error { client := b.s.Client() - ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters") + ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") if err != nil { return err } diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index 90e45762f..514f729aa 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -32,11 +32,11 @@ type RemoteKV struct { val string } -func NewKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) { - return NewKV(kv, key, "", leaseID) +func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) { + return newKV(kv, key, "", leaseID) } -func NewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) { +func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) { rev, err := putNewKV(kv, key, val, leaseID) if err != nil { return nil, err @@ -44,25 +44,7 @@ func NewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) { return &RemoteKV{kv, key, rev, val}, nil } -func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) { - resp, err := kv.Get(context.TODO(), key) - if err != nil { - return nil, err - } - rev := resp.Header.Revision - val := "" - if len(resp.Kvs) > 0 { - rev = resp.Kvs[0].ModRevision - val = string(resp.Kvs[0].Value) - } - return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil -} - -func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) { - return NewUniqueKV(kv, prefix, "", 0) -} - -func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID v3.LeaseID) (*RemoteKV, error) { +func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) rev, err := putNewKV(kv, newKey, val, 0) @@ -90,14 +72,9 @@ func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) { return txnresp.Header.Revision, nil } -// NewSequentialKV allocates a new sequential key-value pair at /nnnnn -func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { - return newSequentialKV(kv, prefix, val, 0) -} - // newSequentialKV allocates a new sequential key /nnnnn with a given // value and lease. Note: a bookkeeping node __ is also allocated. -func newSequentialKV(kv v3.KV, prefix, val string, leaseID v3.LeaseID) (*RemoteKV, error) { +func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...) if err != nil { return nil, err @@ -118,22 +95,22 @@ func newSequentialKV(kv v3.KV, prefix, val string, leaseID v3.LeaseID) (*RemoteK // base prefix key must be current (i.e., <=) with the server update; // the base key is important to avoid the following: // N1: LastKey() == 1, start txn. - // N2: New Key 2, New Key 3, Delete Key 2 + // N2: new Key 2, new Key 3, Delete Key 2 // N1: txn succeeds allocating key 2 when it shouldn't baseKey := "__" + prefix // current revision might contain modification so +1 cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1) - reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID)) - reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID)) + reqPrefix := v3.OpPut(baseKey, "") + reqnewKey := v3.OpPut(newKey, val) txn := kv.Txn(context.TODO()) - txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit() + txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit() if err != nil { return nil, err } if !txnresp.Succeeded { - return newSequentialKV(kv, prefix, val, leaseID) + return newSequentialKV(kv, prefix, val) } return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil } @@ -159,25 +136,25 @@ func (rk *RemoteKV) Put(val string) error { // EphemeralKV is a new key associated with a session lease type EphemeralKV struct{ RemoteKV } -// NewEphemeralKV creates a new key/value pair associated with a session lease -func NewEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) { - k, err := NewKV(s.Client(), key, val, s.Lease()) +// newEphemeralKV creates a new key/value pair associated with a session lease +func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) { + k, err := newKV(s.Client(), key, val, s.Lease()) if err != nil { return nil, err } return &EphemeralKV{*k}, nil } -// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease -func NewUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) { - return NewUniqueEphemeralKV(s, prefix, "") +// newUniqueEphemeralKey creates a new unique valueless key associated with a session lease +func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) { + return newUniqueEphemeralKV(s, prefix, "") } -// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease -func NewUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) { +// newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease +func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) - ek, err = NewEphemeralKV(s, newKey, val) + ek, err = newEphemeralKV(s, newKey, val) if err == nil || err != ErrKeyExists { break } diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index 896bd15ab..a62fb02af 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -37,7 +37,7 @@ func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue { // Enqueue puts a value into a queue with a given priority. func (q *PriorityQueue) Enqueue(val string, pr uint16) error { prefix := fmt.Sprintf("%s%05d", q.key, pr) - _, err := NewSequentialKV(q.client, prefix, val) + _, err := newSequentialKV(q.client, prefix, val) return err } diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index c02a1af5d..714c40604 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -33,7 +33,7 @@ func NewQueue(client *v3.Client, keyPrefix string) *Queue { } func (q *Queue) Enqueue(val string) error { - _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) + _, err := newUniqueKV(q.client, q.keyPrefix, val) return err } diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index 8a03307ed..2714305df 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -34,7 +34,7 @@ func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex { } func (rwm *RWMutex) RLock() error { - rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"read") + rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read") if err != nil { return err } @@ -48,7 +48,7 @@ func (rwm *RWMutex) RLock() error { } func (rwm *RWMutex) Lock() error { - rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"write") + rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write") if err != nil { return err }