From 3bb3351ca0cce55db1d8f7748555c907cdef0cdf Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 20 Feb 2016 16:07:22 -0800 Subject: [PATCH] contrib/recipes: use clientv3 kv API --- contrib/recipes/barrier.go | 20 +++-- contrib/recipes/client.go | 43 ++-------- contrib/recipes/double_barrier.go | 27 +++--- contrib/recipes/election.go | 22 ++--- contrib/recipes/key.go | 134 ++++++++++-------------------- contrib/recipes/mutex.go | 25 +++--- contrib/recipes/priority_queue.go | 19 +++-- contrib/recipes/queue.go | 20 +++-- contrib/recipes/range.go | 100 +++------------------- contrib/recipes/rwmutex.go | 33 ++++---- contrib/recipes/stm.go | 38 ++++----- integration/v3_stm_test.go | 9 +- 12 files changed, 185 insertions(+), 305 deletions(-) diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index 9d080d9a1..dcfd025a6 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -16,38 +16,40 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Barrier creates a key in etcd to block processes, then deletes the key to // release all blocked processes. type Barrier struct { - client *clientv3.Client - key string + client *v3.Client + kv v3.KV + ctx context.Context + + key string } -func NewBarrier(client *clientv3.Client, key string) *Barrier { - return &Barrier{client, key} +func NewBarrier(client *v3.Client, key string) *Barrier { + return &Barrier{client, v3.NewKV(client), context.TODO(), key} } // Hold creates the barrier key causing processes to block on Wait. func (b *Barrier) Hold() error { - _, err := NewKey(b.client, b.key, 0) + _, err := NewKey(b.kv, b.key, 0) return err } // Release deletes the barrier key to unblock all waiting processes. func (b *Barrier) Release() error { - _, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)}) + _, err := b.kv.Delete(b.ctx, b.key) return err } // Wait blocks on the barrier key until it is deleted. If there is no key, Wait // assumes Release has already been called and returns immediately. func (b *Barrier) Wait() error { - resp, err := NewRange(b.client, b.key).FirstKey() + resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...) if err != nil { return err } diff --git a/contrib/recipes/client.go b/contrib/recipes/client.go index 428782d22..79c9344bb 100644 --- a/contrib/recipes/client.go +++ b/contrib/recipes/client.go @@ -18,7 +18,7 @@ import ( "errors" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" spb "github.com/coreos/etcd/storage/storagepb" ) @@ -30,22 +30,10 @@ var ( ) // deleteRevKey deletes a key by revision, returning false if key is missing -func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) { - cmp := &pb.Compare{ - Result: pb.Compare_EQUAL, - Target: pb.Compare_MOD, - Key: []byte(key), - TargetUnion: &pb.Compare_ModRevision{ModRevision: rev}, - } - req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{ - RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} - txnresp, err := kvc.Txn( - context.TODO(), - &pb.TxnRequest{ - Compare: []*pb.Compare{cmp}, - Success: []*pb.RequestUnion{req}, - Failure: nil, - }) +func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) { + cmp := v3.Compare(v3.ModifiedRevision(key), "=", rev) + req := v3.OpDelete(key) + txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit() if err != nil { return false, err } else if txnresp.Succeeded == false { @@ -54,27 +42,14 @@ func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) { return true, nil } -func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) { - for _, kv := range kvs { - ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision) +func claimFirstKey(kv v3.KV, kvs []*spb.KeyValue) (*spb.KeyValue, error) { + for _, k := range kvs { + ok, err := deleteRevKey(kv, string(k.Key), k.ModRevision) if err != nil { return nil, err } else if ok { - return kv, nil + return k, nil } } return nil, nil } - -func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) { - return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}}) -} - -// deletePrefix performs a RangeRequest to get keys on a given prefix -func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) { - return kv.DeleteRange( - context.TODO(), - &pb.DeleteRangeRequest{ - Key: []byte(prefix), - RangeEnd: []byte(prefixEnd(prefix))}) -} diff --git a/contrib/recipes/double_barrier.go b/contrib/recipes/double_barrier.go index 142049fd8..1be537d1c 100644 --- a/contrib/recipes/double_barrier.go +++ b/contrib/recipes/double_barrier.go @@ -17,7 +17,6 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage/storagepb" ) @@ -25,13 +24,22 @@ import ( // blocks again on Leave until all processes have left. type DoubleBarrier struct { client *clientv3.Client - key string // key for the collective barrier - count int - myKey *EphemeralKV // current key for this process on the barrier + kv clientv3.KV + ctx context.Context + + key string // key for the collective barrier + count int + myKey *EphemeralKV // current key for this process on the barrier } func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier { - return &DoubleBarrier{client, key, count, nil} + return &DoubleBarrier{ + client: client, + kv: clientv3.NewKV(client), + ctx: context.TODO(), + key: key, + count: count, + } } // Enter waits for "count" processes to enter the barrier then returns @@ -42,7 +50,7 @@ func (b *DoubleBarrier) Enter() error { } b.myKey = ek - resp, err := NewRange(b.client, b.key+"/waiters").Prefix() + resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if err != nil { return err } @@ -53,7 +61,7 @@ func (b *DoubleBarrier) Enter() error { if len(resp.Kvs) == b.count { // unblock waiters - _, err = putEmptyKey(b.client.KV, b.key+"/ready") + _, err = b.kv.Put(b.ctx, b.key+"/ready", "") return err } @@ -67,7 +75,7 @@ func (b *DoubleBarrier) Enter() error { // Leave waits for "count" processes to leave the barrier then returns func (b *DoubleBarrier) Leave() error { - resp, err := NewRange(b.client, b.key+"/waiters").Prefix() + resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if len(resp.Kvs) == 0 { return nil } @@ -85,8 +93,7 @@ func (b *DoubleBarrier) Leave() error { if len(resp.Kvs) == 1 { // this is the only node in the barrier; finish up - req := &pb.DeleteRangeRequest{Key: []byte(b.key + "/ready")} - if _, err = b.client.KV.DeleteRange(context.TODO(), req); err != nil { + if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil { return err } return b.myKey.Delete() diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go index 563fbd574..163f49867 100644 --- a/contrib/recipes/election.go +++ b/contrib/recipes/election.go @@ -14,20 +14,24 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/storage/storagepb" ) type Election struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context + keyPrefix string leaderKey *EphemeralKV } // NewElection returns a new election on a given key prefix. -func NewElection(client *clientv3.Client, keyPrefix string) *Election { - return &Election{client, keyPrefix, nil} +func NewElection(client *v3.Client, keyPrefix string) *Election { + return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil} } // Volunteer puts a value as eligible for the election. It blocks until @@ -58,7 +62,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := NewRange(e.client, e.keyPrefix).FirstCreate() + resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -70,7 +74,7 @@ func (e *Election) Leader() (string, error) { // Wait waits for a leader to be elected, returning the leader value. func (e *Election) Wait() (string, error) { - resp, err := NewRange(e.client, e.keyPrefix).FirstCreate() + resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) != 0 { @@ -89,10 +93,8 @@ func (e *Election) Wait() (string, error) { } func (e *Election) waitLeadership(tryKey *EphemeralKV) error { - resp, err := NewRangeRev( - e.client, - e.keyPrefix, - tryKey.Revision()-1).LastCreate() + opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1)) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) if err != nil { return err } else if len(resp.Kvs) == 0 { diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index 28339e580..5c086213f 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -20,36 +20,32 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/lease" ) // Key is a key/revision pair created by the client and stored on etcd type RemoteKV struct { - client *clientv3.Client - key string - rev int64 - val string + kv v3.KV + key string + rev int64 + val string } -func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) { - return NewKV(client, key, "", leaseID) +func NewKey(kv v3.KV, key string, leaseID lease.LeaseID) (*RemoteKV, error) { + return NewKV(kv, key, "", leaseID) } -func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - rev, err := putNewKV(client, key, val, leaseID) +func NewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + rev, err := putNewKV(kv, key, val, leaseID) if err != nil { return nil, err } - return &RemoteKV{client, key, rev, val}, nil + return &RemoteKV{kv, key, rev, val}, nil } -func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) { - resp, err := client.KV.Range( - context.TODO(), - &pb.RangeRequest{Key: []byte(key)}, - ) +func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) { + resp, err := kv.Get(context.TODO(), key) if err != nil { return nil, err } @@ -59,23 +55,19 @@ func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) { rev = resp.Kvs[0].ModRevision val = string(resp.Kvs[0].Value) } - return &RemoteKV{ - client: client, - key: key, - rev: rev, - val: val}, nil + return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil } -func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) { - return NewUniqueKV(client, prefix, "", 0) +func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) { + return NewUniqueKV(kv, prefix, "", 0) } -func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) { +func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) - rev, err := putNewKV(client, newKey, val, 0) + rev, err := putNewKV(kv, newKey, val, 0) if err == nil { - return &RemoteKV{client, newKey, rev, val}, nil + return &RemoteKV{kv, newKey, rev, val}, nil } if err != ErrKeyExists { return nil, err @@ -85,22 +77,10 @@ func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lea // putNewKV attempts to create the given key, only succeeding if the key did // not yet exist. -func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) { - cmp := &pb.Compare{ - Result: pb.Compare_EQUAL, - Target: pb.Compare_VERSION, - Key: []byte(key), - TargetUnion: &pb.Compare_Version{Version: 0}} - - req := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(key), - Value: []byte(val), - Lease: int64(leaseID)}}} - txnresp, err := ec.KV.Txn( - context.TODO(), - &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) +func putNewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (int64, error) { + cmp := v3.Compare(v3.Version(key), "=", 0) + req := v3.OpPut(key, val, v3.WithLease(leaseID)) + txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit() if err != nil { return 0, err } @@ -111,14 +91,14 @@ func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int6 } // NewSequentialKV allocates a new sequential key-value pair at /nnnnn -func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) { - return newSequentialKV(client, prefix, val, 0) +func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { + return newSequentialKV(kv, prefix, val, 0) } // newSequentialKV allocates a new sequential key /nnnnn with a given // value and lease. Note: a bookkeeping node __ is also allocated. -func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - resp, err := NewRange(client, prefix).LastKey() +func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + resp, err := kv.Get(context.TODO(), prefix, withLastKey()...) if err != nil { return nil, err } @@ -127,9 +107,9 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease. newSeqNum := 0 if len(resp.Kvs) != 0 { fields := strings.Split(string(resp.Kvs[0].Key), "/") - _, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) - if err != nil { - return nil, err + _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) + if serr != nil { + return nil, serr } newSeqNum++ } @@ -140,42 +120,22 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease. // N1: LastKey() == 1, start txn. // N2: New Key 2, New Key 3, Delete Key 2 // N1: txn succeeds allocating key 2 when it shouldn't - baseKey := []byte("__" + prefix) - cmp := &pb.Compare{ - Result: pb.Compare_LESS, - Target: pb.Compare_MOD, - Key: []byte(baseKey), - // current revision might contain modification so +1 - TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1}, - } + baseKey := "__" + prefix - reqPrefix := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: baseKey, - Lease: int64(leaseID), - }}} + // current revision might contain modification so +1 + cmp := v3.Compare(v3.ModifiedRevision(baseKey), "<", resp.Header.Revision+1) + reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID)) + reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID)) - reqNewKey := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(newKey), - Value: []byte(val), - Lease: int64(leaseID), - }}} - - txnresp, err := client.KV.Txn( - context.TODO(), - &pb.TxnRequest{ - []*pb.Compare{cmp}, - []*pb.RequestUnion{reqPrefix, reqNewKey}, nil}) + txn := kv.Txn(context.TODO()) + txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit() if err != nil { return nil, err } if txnresp.Succeeded == false { - return newSequentialKV(client, prefix, val, leaseID) + return newSequentialKV(kv, prefix, val, leaseID) } - return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil + return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil } func (rk *RemoteKV) Key() string { return rk.key } @@ -183,18 +143,16 @@ func (rk *RemoteKV) Revision() int64 { return rk.rev } func (rk *RemoteKV) Value() string { return rk.val } func (rk *RemoteKV) Delete() error { - if rk.client == nil { + if rk.kv == nil { return nil } - req := &pb.DeleteRangeRequest{Key: []byte(rk.key)} - _, err := rk.client.KV.DeleteRange(context.TODO(), req) - rk.client = nil + _, err := rk.kv.Delete(context.TODO(), rk.key) + rk.kv = nil return err } func (rk *RemoteKV) Put(val string) error { - req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)} - _, err := rk.client.KV.Put(context.TODO(), req) + _, err := rk.kv.Put(context.TODO(), rk.key, val) return err } @@ -202,12 +160,12 @@ func (rk *RemoteKV) Put(val string) error { type EphemeralKV struct{ RemoteKV } // NewEphemeralKV creates a new key/value pair associated with a session lease -func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) { +func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { leaseID, err := SessionLease(client) if err != nil { return nil, err } - k, err := NewKV(client, key, val, leaseID) + k, err := NewKV(v3.NewKV(client), key, val, leaseID) if err != nil { return nil, err } @@ -215,12 +173,12 @@ func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, err } // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease -func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) { +func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) { return NewUniqueEphemeralKV(client, prefix, "") } // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease -func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) { +func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) ek, err = NewEphemeralKV(client, newKey, val) diff --git a/contrib/recipes/mutex.go b/contrib/recipes/mutex.go index 277f0afd2..f872e6d4d 100644 --- a/contrib/recipes/mutex.go +++ b/contrib/recipes/mutex.go @@ -17,29 +17,33 @@ package recipe import ( "sync" - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Mutex implements the sync Locker interface with etcd type Mutex struct { - client *clientv3.Client - key string - myKey *RemoteKV + client *v3.Client + kv v3.KV + ctx context.Context + + key string + myKey *EphemeralKV } -func NewMutex(client *clientv3.Client, key string) *Mutex { - return &Mutex{client, key, nil} +func NewMutex(client *v3.Client, key string) *Mutex { + return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil} } func (m *Mutex) Lock() (err error) { // put self in lock waiters via myKey; oldest waiter holds lock - m.myKey, err = NewUniqueKey(m.client, m.key) + m.myKey, err = NewUniqueEphemeralKey(m.client, m.key) if err != nil { return err } // find oldest element in waiters via revision of insertion - resp, err := NewRange(m.client, m.key).FirstRev() + resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...) if err != nil { return err } @@ -48,7 +52,8 @@ func (m *Mutex) Lock() (err error) { return nil } // otherwise myKey isn't lowest, so there must be a key prior to myKey - lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev() + opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1)) + lastKey, err := m.kv.Get(m.ctx, m.key, opts...) if err != nil { return err } @@ -81,6 +86,6 @@ func (lm *lockerMutex) Unlock() { } } -func NewLocker(client *clientv3.Client, key string) sync.Locker { +func NewLocker(client *v3.Client, key string) sync.Locker { return &lockerMutex{NewMutex(client, key)} } diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index a5ac7db5b..e7cce4ec0 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -17,25 +17,28 @@ package recipe import ( "fmt" - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // PriorityQueue implements a multi-reader, multi-writer distributed queue. type PriorityQueue struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context key string } // NewPriorityQueue creates an etcd priority queue. -func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue { - return &PriorityQueue{client, key + "/"} +func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue { + return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"} } // Enqueue puts a value into a queue with a given priority. func (q *PriorityQueue) Enqueue(val string, pr uint16) error { prefix := fmt.Sprintf("%s%05d", q.key, pr) - _, err := NewSequentialKV(q.client, prefix, val) + _, err := NewSequentialKV(q.kv, prefix, val) return err } @@ -43,12 +46,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error { // queue is empty, Dequeue blocks until items are available. func (q *PriorityQueue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := NewRange(q.client, q.key).FirstKey() + resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.client.KV, resp.Kvs) + kv, err := claimFirstKey(q.kv, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -68,7 +71,7 @@ func (q *PriorityQueue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index b0a218a33..396420d2c 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -15,22 +15,26 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Queue implements a multi-reader, multi-writer distributed queue. type Queue struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context + keyPrefix string } -func NewQueue(client *clientv3.Client, keyPrefix string) *Queue { - return &Queue{client, keyPrefix} +func NewQueue(client *v3.Client, keyPrefix string) *Queue { + return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix} } func (q *Queue) Enqueue(val string) error { - _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) + _, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0) return err } @@ -38,12 +42,12 @@ func (q *Queue) Enqueue(val string) error { // queue is empty, Dequeue blocks until elements are available. func (q *Queue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := NewRange(q.client, q.keyPrefix).FirstRev() + resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.client.KV, resp.Kvs) + kv, err := claimFirstKey(q.kv, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -63,7 +67,7 @@ func (q *Queue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/range.go b/contrib/recipes/range.go index cd1e3d3b5..155a644e8 100644 --- a/contrib/recipes/range.go +++ b/contrib/recipes/range.go @@ -15,94 +15,20 @@ package recipe import ( - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) -type Range struct { - kv pb.KVClient - key []byte - rev int64 - keyEnd []byte -} +func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) } +func withLastCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortDescend) } +func withFirstKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortAscend) } +func withLastKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortDescend) } +func withFirstRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortAscend) } +func withLastRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortDescend) } -func NewRange(client *clientv3.Client, key string) *Range { - return NewRangeRev(client, key, 0) -} - -func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range { - return &Range{client.KV, []byte(key), rev, prefixEnd(key)} -} - -// Prefix performs a RangeRequest to get keys matching * -func (r *Range) Prefix() (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{ - Key: prefixNext(string(r.key)), - RangeEnd: r.keyEnd, - Revision: r.rev}) -} - -// OpenInterval gets the keys in the set * - -func (r *Range) OpenInterval() (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev}) -} - -func (r *Range) FirstKey() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY) -} - -func (r *Range) LastKey() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY) -} - -func (r *Range) FirstRev() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) -} - -func (r *Range) LastRev() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) -} - -func (r *Range) FirstCreate() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) -} - -func (r *Range) LastCreate() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) -} - -// topTarget gets the first key for a given sort order and target -func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{ - Key: r.key, - RangeEnd: r.keyEnd, - Limit: 1, - Revision: r.rev, - SortOrder: order, - SortTarget: target}) -} - -// prefixNext returns the first key possibly matched by * - -func prefixNext(prefix string) []byte { - return append([]byte(prefix), 0) -} - -// prefixEnd returns the last key possibly matched by * -func prefixEnd(prefix string) []byte { - keyEnd := []byte(prefix) - for i := len(keyEnd) - 1; i >= 0; i-- { - if keyEnd[i] < 0xff { - keyEnd[i] = keyEnd[i] + 1 - keyEnd = keyEnd[:i+1] - break - } - } - return keyEnd +// withTop gets the first key over the get's prefix given a sort order +func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption { + return []v3.OpOption{ + v3.WithPrefix(), + v3.WithSort(target, order), + v3.WithLimit(1)} } diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index a7b93235c..93aff10cd 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -15,23 +15,26 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) type RWMutex struct { - client *clientv3.Client - key string - myKey *RemoteKV + client *v3.Client + kv v3.KV + ctx context.Context + + key string + myKey *EphemeralKV } -func NewRWMutex(client *clientv3.Client, key string) *RWMutex { - return &RWMutex{client, key, nil} +func NewRWMutex(client *v3.Client, key string) *RWMutex { + return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil} } func (rwm *RWMutex) RLock() error { - // XXX: make reads ephemeral locks? - rk, err := NewUniqueKey(rwm.client, rwm.key+"/read") + rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read") if err != nil { return err } @@ -39,7 +42,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev() + resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...) if err != nil { return err } @@ -51,21 +54,22 @@ func (rwm *RWMutex) RLock() error { } func (rwm *RWMutex) Lock() error { - rk, err := NewUniqueKey(rwm.client, rwm.key+"/write") + rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write") if err != nil { return err } rwm.myKey = rk for { - // any key of lower rev number blocks the write lock - resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev() + // find any key of lower rev number blocks the write lock + opts := append(withLastRev(), v3.WithRev(rk.Revision()-1)) + resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } if len(resp.Kvs) == 0 { // no matching for revision before myKey; acquired - return nil + break } if err := rwm.waitOnLowest(); err != nil { return err @@ -78,7 +82,8 @@ func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) waitOnLowest() error { // must block; get key before ek for waiting - lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev() + opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) + lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } diff --git a/contrib/recipes/stm.go b/contrib/recipes/stm.go index 6821f454d..03c090b3d 100644 --- a/contrib/recipes/stm.go +++ b/contrib/recipes/stm.go @@ -16,13 +16,13 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // STM implements software transactional memory over etcd type STM struct { - client *clientv3.Client + client *v3.Client + kv v3.KV // rset holds the read key's value and revision of read rset map[string]*RemoteKV // wset holds the write key and its value @@ -33,8 +33,8 @@ type STM struct { } // NewSTM creates new transaction loop for a given apply function. -func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error { - s := &STM{client: client, apply: apply} +func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error { + s := &STM{client: client, kv: v3.NewKV(client), apply: apply} errc := make(chan error, 1) go func() { var err error @@ -43,7 +43,8 @@ func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error { if err = apply(s); err != nil || s.aborted { break } - if ok, err := s.commit(); ok || err != nil { + if ok, cerr := s.commit(); ok || cerr != nil { + err = cerr break } } @@ -63,7 +64,7 @@ func (s *STM) Get(key string) (string, error) { if rk, ok := s.rset[key]; ok { return rk.Value(), nil } - rk, err := GetRemoteKV(s.client, key) + rk, err := GetRemoteKV(s.kv, key) if err != nil { return "", err } @@ -76,30 +77,21 @@ func (s *STM) Get(key string) (string, error) { func (s *STM) Put(key string, val string) { s.wset[key] = val } // commit attempts to apply the txn's changes to the server. -func (s *STM) commit() (ok bool, err error) { +func (s *STM) commit() (ok bool, rr error) { // read set must not change - cmps := []*pb.Compare{} + cmps := make([]v3.Cmp, 0, len(s.rset)) for k, rk := range s.rset { // use < to support updating keys that don't exist yet - cmp := &pb.Compare{ - Result: pb.Compare_LESS, - Target: pb.Compare_MOD, - Key: []byte(k), - TargetUnion: &pb.Compare_ModRevision{ModRevision: rk.Revision() + 1}, - } + cmp := v3.Compare(v3.ModifiedRevision(k), "<", rk.Revision()+1) cmps = append(cmps, cmp) } + // apply all writes - puts := []*pb.RequestUnion{} + puts := make([]v3.Op, 0, len(s.wset)) for k, v := range s.wset { - puts = append(puts, &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(k), - Value: []byte(v), - }}}) + puts = append(puts, v3.OpPut(k, v)) } - txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil}) + txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit() return txnresp.Succeeded, err } diff --git a/integration/v3_stm_test.go b/integration/v3_stm_test.go index a056fda9e..b496664dc 100644 --- a/integration/v3_stm_test.go +++ b/integration/v3_stm_test.go @@ -19,6 +19,7 @@ import ( "strconv" "testing" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/contrib/recipes" ) @@ -30,7 +31,7 @@ func TestSTMConflict(t *testing.T) { etcdc := clus.RandClient() keys := make([]*recipe.RemoteKV, 5) for i := 0; i < len(keys); i++ { - rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0) + rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0) if err != nil { t.Fatalf("could not make key (%v)", err) } @@ -75,7 +76,7 @@ func TestSTMConflict(t *testing.T) { // ensure sum matches initial sum sum := 0 for _, oldRK := range keys { - rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key()) + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key()) if err != nil { t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err) } @@ -102,7 +103,7 @@ func TestSTMPutNewKey(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(etcdc, "foo") + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) } @@ -128,7 +129,7 @@ func TestSTMAbort(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(etcdc, "foo") + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) }