contrib/recipes: unexport and clean up keys.go

Fixes #6731
This commit is contained in:
Anthony Romano 2016-10-27 15:02:58 -04:00
parent 1b36162659
commit 8f718e2e5a
6 changed files with 25 additions and 48 deletions

View File

@ -35,7 +35,7 @@ func NewBarrier(client *v3.Client, key string) *Barrier {
// Hold creates the barrier key causing processes to block on Wait. // Hold creates the barrier key causing processes to block on Wait.
func (b *Barrier) Hold() error { func (b *Barrier) Hold() error {
_, err := NewKey(b.client, b.key, 0) _, err := newKey(b.client, b.key, 0)
return err return err
} }

View File

@ -44,7 +44,7 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr
// Enter waits for "count" processes to enter the barrier then returns // Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error { func (b *DoubleBarrier) Enter() error {
client := b.s.Client() client := b.s.Client()
ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters") ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,11 +32,11 @@ type RemoteKV struct {
val string val string
} }
func NewKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) { func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
return NewKV(kv, key, "", leaseID) return newKV(kv, key, "", leaseID)
} }
func NewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) { func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
rev, err := putNewKV(kv, key, val, leaseID) rev, err := putNewKV(kv, key, val, leaseID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -44,25 +44,7 @@ func NewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
return &RemoteKV{kv, key, rev, val}, nil return &RemoteKV{kv, key, rev, val}, nil
} }
func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) { func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
resp, err := kv.Get(context.TODO(), key)
if err != nil {
return nil, err
}
rev := resp.Header.Revision
val := ""
if len(resp.Kvs) > 0 {
rev = resp.Kvs[0].ModRevision
val = string(resp.Kvs[0].Value)
}
return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil
}
func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) {
return NewUniqueKV(kv, prefix, "", 0)
}
func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
for { for {
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
rev, err := putNewKV(kv, newKey, val, 0) rev, err := putNewKV(kv, newKey, val, 0)
@ -90,14 +72,9 @@ func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
return txnresp.Header.Revision, nil return txnresp.Header.Revision, nil
} }
// NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
return newSequentialKV(kv, prefix, val, 0)
}
// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
// value and lease. Note: a bookkeeping node __<prefix> is also allocated. // value and lease. Note: a bookkeeping node __<prefix> is also allocated.
func newSequentialKV(kv v3.KV, prefix, val string, leaseID v3.LeaseID) (*RemoteKV, error) { func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...) resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -118,22 +95,22 @@ func newSequentialKV(kv v3.KV, prefix, val string, leaseID v3.LeaseID) (*RemoteK
// base prefix key must be current (i.e., <=) with the server update; // base prefix key must be current (i.e., <=) with the server update;
// the base key is important to avoid the following: // the base key is important to avoid the following:
// N1: LastKey() == 1, start txn. // N1: LastKey() == 1, start txn.
// N2: New Key 2, New Key 3, Delete Key 2 // N2: new Key 2, new Key 3, Delete Key 2
// N1: txn succeeds allocating key 2 when it shouldn't // N1: txn succeeds allocating key 2 when it shouldn't
baseKey := "__" + prefix baseKey := "__" + prefix
// current revision might contain modification so +1 // current revision might contain modification so +1
cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1) cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID)) reqPrefix := v3.OpPut(baseKey, "")
reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID)) reqnewKey := v3.OpPut(newKey, val)
txn := kv.Txn(context.TODO()) txn := kv.Txn(context.TODO())
txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit() txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !txnresp.Succeeded { if !txnresp.Succeeded {
return newSequentialKV(kv, prefix, val, leaseID) return newSequentialKV(kv, prefix, val)
} }
return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
} }
@ -159,25 +136,25 @@ func (rk *RemoteKV) Put(val string) error {
// EphemeralKV is a new key associated with a session lease // EphemeralKV is a new key associated with a session lease
type EphemeralKV struct{ RemoteKV } type EphemeralKV struct{ RemoteKV }
// NewEphemeralKV creates a new key/value pair associated with a session lease // newEphemeralKV creates a new key/value pair associated with a session lease
func NewEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) { func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
k, err := NewKV(s.Client(), key, val, s.Lease()) k, err := newKV(s.Client(), key, val, s.Lease())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &EphemeralKV{*k}, nil return &EphemeralKV{*k}, nil
} }
// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease // newUniqueEphemeralKey creates a new unique valueless key associated with a session lease
func NewUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) { func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
return NewUniqueEphemeralKV(s, prefix, "") return newUniqueEphemeralKV(s, prefix, "")
} }
// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease // newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
func NewUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) { func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
for { for {
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
ek, err = NewEphemeralKV(s, newKey, val) ek, err = newEphemeralKV(s, newKey, val)
if err == nil || err != ErrKeyExists { if err == nil || err != ErrKeyExists {
break break
} }

View File

@ -37,7 +37,7 @@ func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
// Enqueue puts a value into a queue with a given priority. // Enqueue puts a value into a queue with a given priority.
func (q *PriorityQueue) Enqueue(val string, pr uint16) error { func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
prefix := fmt.Sprintf("%s%05d", q.key, pr) prefix := fmt.Sprintf("%s%05d", q.key, pr)
_, err := NewSequentialKV(q.client, prefix, val) _, err := newSequentialKV(q.client, prefix, val)
return err return err
} }

View File

@ -33,7 +33,7 @@ func NewQueue(client *v3.Client, keyPrefix string) *Queue {
} }
func (q *Queue) Enqueue(val string) error { func (q *Queue) Enqueue(val string) error {
_, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) _, err := newUniqueKV(q.client, q.keyPrefix, val)
return err return err
} }

View File

@ -34,7 +34,7 @@ func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
} }
func (rwm *RWMutex) RLock() error { func (rwm *RWMutex) RLock() error {
rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"read") rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
if err != nil { if err != nil {
return err return err
} }
@ -48,7 +48,7 @@ func (rwm *RWMutex) RLock() error {
} }
func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) Lock() error {
rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"write") rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
if err != nil { if err != nil {
return err return err
} }