From 936e991f9f62c1395ef4a74c29fe0040166cc625 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 18 Feb 2016 16:01:32 -0800 Subject: [PATCH 1/3] contrib/recipes: use clientv3 watcher API --- contrib/recipes/client.go | 1 + contrib/recipes/watch.go | 151 +++++++------------------------------- 2 files changed, 28 insertions(+), 124 deletions(-) diff --git a/contrib/recipes/client.go b/contrib/recipes/client.go index 8cf357197..428782d22 100644 --- a/contrib/recipes/client.go +++ b/contrib/recipes/client.go @@ -26,6 +26,7 @@ var ( ErrKeyExists = errors.New("key already exists") ErrWaitMismatch = errors.New("unexpected wait result") ErrTooManyClients = errors.New("too many clients") + ErrNoWatcher = errors.New("no watcher channel") ) // deleteRevKey deletes a key by revision, returning false if key is missing diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index 444073f94..7a869736c 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -17,138 +17,41 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/storage" "github.com/coreos/etcd/storage/storagepb" ) -type Watcher struct { - wstream pb.Watch_WatchClient - cancel context.CancelFunc - donec chan struct{} - id storage.WatchID - recvc chan *storagepb.Event - lastErr error -} - -func NewWatcher(c *clientv3.Client, key string, rev int64) (*Watcher, error) { - return newWatcher(c, key, rev, false) -} - -func NewPrefixWatcher(c *clientv3.Client, prefix string, rev int64) (*Watcher, error) { - return newWatcher(c, prefix, rev, true) -} - -func newWatcher(c *clientv3.Client, key string, rev int64, isPrefix bool) (*Watcher, error) { - ctx, cancel := context.WithCancel(context.Background()) - w, err := c.Watch.Watch(ctx) - if err != nil { - return nil, err - } - - req := &pb.WatchCreateRequest{StartRevision: rev} - if isPrefix { - req.Prefix = []byte(key) - } else { - req.Key = []byte(key) - } - - if err := w.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{CreateRequest: req}}); err != nil { - return nil, err - } - - wresp, err := w.Recv() - if err != nil { - return nil, err - } - if len(wresp.Events) != 0 || wresp.Created != true { - return nil, ErrWaitMismatch - } - ret := &Watcher{ - wstream: w, - cancel: cancel, - donec: make(chan struct{}), - id: storage.WatchID(wresp.WatchId), - recvc: make(chan *storagepb.Event), - } - go ret.recvLoop() - return ret, nil -} - -func (w *Watcher) Close() error { - defer w.cancel() - if w.wstream == nil { - return w.lastErr - } - req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{ - CancelRequest: &pb.WatchCancelRequest{ - WatchId: int64(w.id)}}} - err := w.wstream.Send(req) - if err != nil && w.lastErr == nil { - return err - } - w.wstream.CloseSend() - w.donec <- struct{}{} - <-w.donec - w.wstream = nil - return w.lastErr -} - -func (w *Watcher) Chan() <-chan *storagepb.Event { return w.recvc } - -func (w *Watcher) recvLoop() { - defer close(w.donec) - for { - wresp, err := w.wstream.Recv() - if err != nil { - w.lastErr = err - break - } - for i := range wresp.Events { - select { - case <-w.donec: - close(w.recvc) - return - case w.recvc <- wresp.Events[i]: - } - } - } - close(w.recvc) - <-w.donec -} - -func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event, error) { - i := 0 - for { - ev, ok := <-w.recvc - if !ok { - break - } - if ev.Type == evs[i] { - i++ - if i == len(evs) { - return ev, nil - } - } - } - return nil, w.Close() -} - // WaitEvents waits on a key until it observes the given events and returns the final one. func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w, err := NewWatcher(c, key, rev) - if err != nil { - return nil, err + w := clientv3.NewWatcher(c) + wc := w.Watch(context.Background(), key, rev) + if wc == nil { + w.Close() + return nil, ErrNoWatcher } - defer w.Close() - return w.waitEvents(evs) + return waitEvents(wc, evs), w.Close() } func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w, err := NewPrefixWatcher(c, prefix, rev) - if err != nil { - return nil, err + w := clientv3.NewWatcher(c) + wc := w.WatchPrefix(context.Background(), prefix, rev) + if wc == nil { + w.Close() + return nil, ErrNoWatcher } - defer w.Close() - return w.waitEvents(evs) + return waitEvents(wc, evs), w.Close() +} + +func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event { + i := 0 + for wresp := range wc { + for _, ev := range wresp.Events { + if ev.Type == evs[i] { + i++ + if i == len(evs) { + return ev + } + } + } + } + return nil } From 16420cfe63e16ae6e0e1bd9c65ce1119b8a25334 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 18 Feb 2016 16:16:13 -0800 Subject: [PATCH 2/3] contrib/recipes: use clientv3 lease API --- contrib/recipes/lease.go | 54 ++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/contrib/recipes/lease.go b/contrib/recipes/lease.go index 20ff2c598..327d12fb7 100644 --- a/contrib/recipes/lease.go +++ b/contrib/recipes/lease.go @@ -15,11 +15,9 @@ package recipe import ( "sync" - "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" ) @@ -32,8 +30,9 @@ type clientLeaseMgr struct { } type leaseKeepAlive struct { - id lease.LeaseID - donec chan struct{} + id lease.LeaseID + cancel context.CancelFunc + donec <-chan struct{} } func SessionLease(client *clientv3.Client) (lease.LeaseID, error) { @@ -49,13 +48,10 @@ func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error) // would fail) or if transferring lease ownership. func StopSessionLease(client *clientv3.Client) { clientLeases.mu.Lock() - lka, ok := clientLeases.leases[client] - if ok { - delete(clientLeases.leases, client) - } + lka := clientLeases.leases[client] clientLeases.mu.Unlock() if lka != nil { - lka.donec <- struct{}{} + lka.cancel() <-lka.donec } } @@ -67,8 +63,7 @@ func RevokeSessionLease(client *clientv3.Client) (err error) { clientLeases.mu.Unlock() StopSessionLease(client) if lka != nil { - req := &pb.LeaseRevokeRequest{ID: int64(lka.id)} - _, err = client.Lease.LeaseRevoke(context.TODO(), req) + _, err = clientv3.NewLease(client).Revoke(context.TODO(), lka.id) } return err } @@ -80,48 +75,37 @@ func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lea return lka.id, nil } - resp, err := client.Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: ttl}) + lc := clientv3.NewLease(client) + resp, err := lc.Create(context.TODO(), ttl) if err != nil { return lease.NoLease, err } id := lease.LeaseID(resp.ID) ctx, cancel := context.WithCancel(context.Background()) - keepAlive, err := client.Lease.LeaseKeepAlive(ctx) + keepAlive, err := lc.KeepAlive(ctx, id) if err != nil || keepAlive == nil { return lease.NoLease, err } - lka := &leaseKeepAlive{id: id, donec: make(chan struct{})} + donec := make(chan struct{}) + lka := &leaseKeepAlive{ + id: id, + cancel: cancel, + donec: donec} clm.leases[client] = lka - // keep the lease alive until client error + // keep the lease alive until client error or cancelled context go func() { defer func() { - keepAlive.CloseSend() clm.mu.Lock() delete(clm.leases, client) clm.mu.Unlock() - cancel() - close(lka.donec) + lc.Close() + close(donec) }() - - ttl := resp.TTL - for { - lreq := &pb.LeaseKeepAliveRequest{ID: int64(id)} - select { - case <-lka.donec: - return - case <-time.After(time.Duration(ttl/2) * time.Second): - } - if err := keepAlive.Send(lreq); err != nil { - break - } - resp, err := keepAlive.Recv() - if err != nil { - break - } - ttl = resp.TTL + for range keepAlive { + // eat messages until keep alive channel closes } }() From 3bb3351ca0cce55db1d8f7748555c907cdef0cdf Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 20 Feb 2016 16:07:22 -0800 Subject: [PATCH 3/3] contrib/recipes: use clientv3 kv API --- contrib/recipes/barrier.go | 20 +++-- contrib/recipes/client.go | 43 ++-------- contrib/recipes/double_barrier.go | 27 +++--- contrib/recipes/election.go | 22 ++--- contrib/recipes/key.go | 134 ++++++++++-------------------- contrib/recipes/mutex.go | 25 +++--- contrib/recipes/priority_queue.go | 19 +++-- contrib/recipes/queue.go | 20 +++-- contrib/recipes/range.go | 100 +++------------------- contrib/recipes/rwmutex.go | 33 ++++---- contrib/recipes/stm.go | 38 ++++----- integration/v3_stm_test.go | 9 +- 12 files changed, 185 insertions(+), 305 deletions(-) diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index 9d080d9a1..dcfd025a6 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -16,38 +16,40 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Barrier creates a key in etcd to block processes, then deletes the key to // release all blocked processes. type Barrier struct { - client *clientv3.Client - key string + client *v3.Client + kv v3.KV + ctx context.Context + + key string } -func NewBarrier(client *clientv3.Client, key string) *Barrier { - return &Barrier{client, key} +func NewBarrier(client *v3.Client, key string) *Barrier { + return &Barrier{client, v3.NewKV(client), context.TODO(), key} } // Hold creates the barrier key causing processes to block on Wait. func (b *Barrier) Hold() error { - _, err := NewKey(b.client, b.key, 0) + _, err := NewKey(b.kv, b.key, 0) return err } // Release deletes the barrier key to unblock all waiting processes. func (b *Barrier) Release() error { - _, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)}) + _, err := b.kv.Delete(b.ctx, b.key) return err } // Wait blocks on the barrier key until it is deleted. If there is no key, Wait // assumes Release has already been called and returns immediately. func (b *Barrier) Wait() error { - resp, err := NewRange(b.client, b.key).FirstKey() + resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...) if err != nil { return err } diff --git a/contrib/recipes/client.go b/contrib/recipes/client.go index 428782d22..79c9344bb 100644 --- a/contrib/recipes/client.go +++ b/contrib/recipes/client.go @@ -18,7 +18,7 @@ import ( "errors" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" spb "github.com/coreos/etcd/storage/storagepb" ) @@ -30,22 +30,10 @@ var ( ) // deleteRevKey deletes a key by revision, returning false if key is missing -func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) { - cmp := &pb.Compare{ - Result: pb.Compare_EQUAL, - Target: pb.Compare_MOD, - Key: []byte(key), - TargetUnion: &pb.Compare_ModRevision{ModRevision: rev}, - } - req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{ - RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} - txnresp, err := kvc.Txn( - context.TODO(), - &pb.TxnRequest{ - Compare: []*pb.Compare{cmp}, - Success: []*pb.RequestUnion{req}, - Failure: nil, - }) +func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) { + cmp := v3.Compare(v3.ModifiedRevision(key), "=", rev) + req := v3.OpDelete(key) + txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit() if err != nil { return false, err } else if txnresp.Succeeded == false { @@ -54,27 +42,14 @@ func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) { return true, nil } -func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) { - for _, kv := range kvs { - ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision) +func claimFirstKey(kv v3.KV, kvs []*spb.KeyValue) (*spb.KeyValue, error) { + for _, k := range kvs { + ok, err := deleteRevKey(kv, string(k.Key), k.ModRevision) if err != nil { return nil, err } else if ok { - return kv, nil + return k, nil } } return nil, nil } - -func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) { - return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}}) -} - -// deletePrefix performs a RangeRequest to get keys on a given prefix -func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) { - return kv.DeleteRange( - context.TODO(), - &pb.DeleteRangeRequest{ - Key: []byte(prefix), - RangeEnd: []byte(prefixEnd(prefix))}) -} diff --git a/contrib/recipes/double_barrier.go b/contrib/recipes/double_barrier.go index 142049fd8..1be537d1c 100644 --- a/contrib/recipes/double_barrier.go +++ b/contrib/recipes/double_barrier.go @@ -17,7 +17,6 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage/storagepb" ) @@ -25,13 +24,22 @@ import ( // blocks again on Leave until all processes have left. type DoubleBarrier struct { client *clientv3.Client - key string // key for the collective barrier - count int - myKey *EphemeralKV // current key for this process on the barrier + kv clientv3.KV + ctx context.Context + + key string // key for the collective barrier + count int + myKey *EphemeralKV // current key for this process on the barrier } func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier { - return &DoubleBarrier{client, key, count, nil} + return &DoubleBarrier{ + client: client, + kv: clientv3.NewKV(client), + ctx: context.TODO(), + key: key, + count: count, + } } // Enter waits for "count" processes to enter the barrier then returns @@ -42,7 +50,7 @@ func (b *DoubleBarrier) Enter() error { } b.myKey = ek - resp, err := NewRange(b.client, b.key+"/waiters").Prefix() + resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if err != nil { return err } @@ -53,7 +61,7 @@ func (b *DoubleBarrier) Enter() error { if len(resp.Kvs) == b.count { // unblock waiters - _, err = putEmptyKey(b.client.KV, b.key+"/ready") + _, err = b.kv.Put(b.ctx, b.key+"/ready", "") return err } @@ -67,7 +75,7 @@ func (b *DoubleBarrier) Enter() error { // Leave waits for "count" processes to leave the barrier then returns func (b *DoubleBarrier) Leave() error { - resp, err := NewRange(b.client, b.key+"/waiters").Prefix() + resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if len(resp.Kvs) == 0 { return nil } @@ -85,8 +93,7 @@ func (b *DoubleBarrier) Leave() error { if len(resp.Kvs) == 1 { // this is the only node in the barrier; finish up - req := &pb.DeleteRangeRequest{Key: []byte(b.key + "/ready")} - if _, err = b.client.KV.DeleteRange(context.TODO(), req); err != nil { + if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil { return err } return b.myKey.Delete() diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go index 563fbd574..163f49867 100644 --- a/contrib/recipes/election.go +++ b/contrib/recipes/election.go @@ -14,20 +14,24 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/storage/storagepb" ) type Election struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context + keyPrefix string leaderKey *EphemeralKV } // NewElection returns a new election on a given key prefix. -func NewElection(client *clientv3.Client, keyPrefix string) *Election { - return &Election{client, keyPrefix, nil} +func NewElection(client *v3.Client, keyPrefix string) *Election { + return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil} } // Volunteer puts a value as eligible for the election. It blocks until @@ -58,7 +62,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := NewRange(e.client, e.keyPrefix).FirstCreate() + resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -70,7 +74,7 @@ func (e *Election) Leader() (string, error) { // Wait waits for a leader to be elected, returning the leader value. func (e *Election) Wait() (string, error) { - resp, err := NewRange(e.client, e.keyPrefix).FirstCreate() + resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) != 0 { @@ -89,10 +93,8 @@ func (e *Election) Wait() (string, error) { } func (e *Election) waitLeadership(tryKey *EphemeralKV) error { - resp, err := NewRangeRev( - e.client, - e.keyPrefix, - tryKey.Revision()-1).LastCreate() + opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1)) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) if err != nil { return err } else if len(resp.Kvs) == 0 { diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index 28339e580..5c086213f 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -20,36 +20,32 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/lease" ) // Key is a key/revision pair created by the client and stored on etcd type RemoteKV struct { - client *clientv3.Client - key string - rev int64 - val string + kv v3.KV + key string + rev int64 + val string } -func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) { - return NewKV(client, key, "", leaseID) +func NewKey(kv v3.KV, key string, leaseID lease.LeaseID) (*RemoteKV, error) { + return NewKV(kv, key, "", leaseID) } -func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - rev, err := putNewKV(client, key, val, leaseID) +func NewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + rev, err := putNewKV(kv, key, val, leaseID) if err != nil { return nil, err } - return &RemoteKV{client, key, rev, val}, nil + return &RemoteKV{kv, key, rev, val}, nil } -func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) { - resp, err := client.KV.Range( - context.TODO(), - &pb.RangeRequest{Key: []byte(key)}, - ) +func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) { + resp, err := kv.Get(context.TODO(), key) if err != nil { return nil, err } @@ -59,23 +55,19 @@ func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) { rev = resp.Kvs[0].ModRevision val = string(resp.Kvs[0].Value) } - return &RemoteKV{ - client: client, - key: key, - rev: rev, - val: val}, nil + return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil } -func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) { - return NewUniqueKV(client, prefix, "", 0) +func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) { + return NewUniqueKV(kv, prefix, "", 0) } -func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) { +func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) - rev, err := putNewKV(client, newKey, val, 0) + rev, err := putNewKV(kv, newKey, val, 0) if err == nil { - return &RemoteKV{client, newKey, rev, val}, nil + return &RemoteKV{kv, newKey, rev, val}, nil } if err != ErrKeyExists { return nil, err @@ -85,22 +77,10 @@ func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lea // putNewKV attempts to create the given key, only succeeding if the key did // not yet exist. -func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) { - cmp := &pb.Compare{ - Result: pb.Compare_EQUAL, - Target: pb.Compare_VERSION, - Key: []byte(key), - TargetUnion: &pb.Compare_Version{Version: 0}} - - req := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(key), - Value: []byte(val), - Lease: int64(leaseID)}}} - txnresp, err := ec.KV.Txn( - context.TODO(), - &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) +func putNewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (int64, error) { + cmp := v3.Compare(v3.Version(key), "=", 0) + req := v3.OpPut(key, val, v3.WithLease(leaseID)) + txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit() if err != nil { return 0, err } @@ -111,14 +91,14 @@ func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int6 } // NewSequentialKV allocates a new sequential key-value pair at /nnnnn -func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) { - return newSequentialKV(client, prefix, val, 0) +func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { + return newSequentialKV(kv, prefix, val, 0) } // newSequentialKV allocates a new sequential key /nnnnn with a given // value and lease. Note: a bookkeeping node __ is also allocated. -func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - resp, err := NewRange(client, prefix).LastKey() +func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + resp, err := kv.Get(context.TODO(), prefix, withLastKey()...) if err != nil { return nil, err } @@ -127,9 +107,9 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease. newSeqNum := 0 if len(resp.Kvs) != 0 { fields := strings.Split(string(resp.Kvs[0].Key), "/") - _, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) - if err != nil { - return nil, err + _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) + if serr != nil { + return nil, serr } newSeqNum++ } @@ -140,42 +120,22 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease. // N1: LastKey() == 1, start txn. // N2: New Key 2, New Key 3, Delete Key 2 // N1: txn succeeds allocating key 2 when it shouldn't - baseKey := []byte("__" + prefix) - cmp := &pb.Compare{ - Result: pb.Compare_LESS, - Target: pb.Compare_MOD, - Key: []byte(baseKey), - // current revision might contain modification so +1 - TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1}, - } + baseKey := "__" + prefix - reqPrefix := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: baseKey, - Lease: int64(leaseID), - }}} + // current revision might contain modification so +1 + cmp := v3.Compare(v3.ModifiedRevision(baseKey), "<", resp.Header.Revision+1) + reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID)) + reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID)) - reqNewKey := &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(newKey), - Value: []byte(val), - Lease: int64(leaseID), - }}} - - txnresp, err := client.KV.Txn( - context.TODO(), - &pb.TxnRequest{ - []*pb.Compare{cmp}, - []*pb.RequestUnion{reqPrefix, reqNewKey}, nil}) + txn := kv.Txn(context.TODO()) + txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit() if err != nil { return nil, err } if txnresp.Succeeded == false { - return newSequentialKV(client, prefix, val, leaseID) + return newSequentialKV(kv, prefix, val, leaseID) } - return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil + return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil } func (rk *RemoteKV) Key() string { return rk.key } @@ -183,18 +143,16 @@ func (rk *RemoteKV) Revision() int64 { return rk.rev } func (rk *RemoteKV) Value() string { return rk.val } func (rk *RemoteKV) Delete() error { - if rk.client == nil { + if rk.kv == nil { return nil } - req := &pb.DeleteRangeRequest{Key: []byte(rk.key)} - _, err := rk.client.KV.DeleteRange(context.TODO(), req) - rk.client = nil + _, err := rk.kv.Delete(context.TODO(), rk.key) + rk.kv = nil return err } func (rk *RemoteKV) Put(val string) error { - req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)} - _, err := rk.client.KV.Put(context.TODO(), req) + _, err := rk.kv.Put(context.TODO(), rk.key, val) return err } @@ -202,12 +160,12 @@ func (rk *RemoteKV) Put(val string) error { type EphemeralKV struct{ RemoteKV } // NewEphemeralKV creates a new key/value pair associated with a session lease -func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) { +func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { leaseID, err := SessionLease(client) if err != nil { return nil, err } - k, err := NewKV(client, key, val, leaseID) + k, err := NewKV(v3.NewKV(client), key, val, leaseID) if err != nil { return nil, err } @@ -215,12 +173,12 @@ func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, err } // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease -func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) { +func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) { return NewUniqueEphemeralKV(client, prefix, "") } // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease -func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) { +func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) ek, err = NewEphemeralKV(client, newKey, val) diff --git a/contrib/recipes/mutex.go b/contrib/recipes/mutex.go index 277f0afd2..f872e6d4d 100644 --- a/contrib/recipes/mutex.go +++ b/contrib/recipes/mutex.go @@ -17,29 +17,33 @@ package recipe import ( "sync" - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Mutex implements the sync Locker interface with etcd type Mutex struct { - client *clientv3.Client - key string - myKey *RemoteKV + client *v3.Client + kv v3.KV + ctx context.Context + + key string + myKey *EphemeralKV } -func NewMutex(client *clientv3.Client, key string) *Mutex { - return &Mutex{client, key, nil} +func NewMutex(client *v3.Client, key string) *Mutex { + return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil} } func (m *Mutex) Lock() (err error) { // put self in lock waiters via myKey; oldest waiter holds lock - m.myKey, err = NewUniqueKey(m.client, m.key) + m.myKey, err = NewUniqueEphemeralKey(m.client, m.key) if err != nil { return err } // find oldest element in waiters via revision of insertion - resp, err := NewRange(m.client, m.key).FirstRev() + resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...) if err != nil { return err } @@ -48,7 +52,8 @@ func (m *Mutex) Lock() (err error) { return nil } // otherwise myKey isn't lowest, so there must be a key prior to myKey - lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev() + opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1)) + lastKey, err := m.kv.Get(m.ctx, m.key, opts...) if err != nil { return err } @@ -81,6 +86,6 @@ func (lm *lockerMutex) Unlock() { } } -func NewLocker(client *clientv3.Client, key string) sync.Locker { +func NewLocker(client *v3.Client, key string) sync.Locker { return &lockerMutex{NewMutex(client, key)} } diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index a5ac7db5b..e7cce4ec0 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -17,25 +17,28 @@ package recipe import ( "fmt" - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // PriorityQueue implements a multi-reader, multi-writer distributed queue. type PriorityQueue struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context key string } // NewPriorityQueue creates an etcd priority queue. -func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue { - return &PriorityQueue{client, key + "/"} +func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue { + return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"} } // Enqueue puts a value into a queue with a given priority. func (q *PriorityQueue) Enqueue(val string, pr uint16) error { prefix := fmt.Sprintf("%s%05d", q.key, pr) - _, err := NewSequentialKV(q.client, prefix, val) + _, err := NewSequentialKV(q.kv, prefix, val) return err } @@ -43,12 +46,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error { // queue is empty, Dequeue blocks until items are available. func (q *PriorityQueue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := NewRange(q.client, q.key).FirstKey() + resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.client.KV, resp.Kvs) + kv, err := claimFirstKey(q.kv, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -68,7 +71,7 @@ func (q *PriorityQueue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index b0a218a33..396420d2c 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -15,22 +15,26 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) // Queue implements a multi-reader, multi-writer distributed queue. type Queue struct { - client *clientv3.Client + client *v3.Client + kv v3.KV + ctx context.Context + keyPrefix string } -func NewQueue(client *clientv3.Client, keyPrefix string) *Queue { - return &Queue{client, keyPrefix} +func NewQueue(client *v3.Client, keyPrefix string) *Queue { + return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix} } func (q *Queue) Enqueue(val string) error { - _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) + _, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0) return err } @@ -38,12 +42,12 @@ func (q *Queue) Enqueue(val string) error { // queue is empty, Dequeue blocks until elements are available. func (q *Queue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := NewRange(q.client, q.keyPrefix).FirstRev() + resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.client.KV, resp.Kvs) + kv, err := claimFirstKey(q.kv, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -63,7 +67,7 @@ func (q *Queue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/range.go b/contrib/recipes/range.go index cd1e3d3b5..155a644e8 100644 --- a/contrib/recipes/range.go +++ b/contrib/recipes/range.go @@ -15,94 +15,20 @@ package recipe import ( - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) -type Range struct { - kv pb.KVClient - key []byte - rev int64 - keyEnd []byte -} +func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) } +func withLastCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortDescend) } +func withFirstKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortAscend) } +func withLastKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortDescend) } +func withFirstRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortAscend) } +func withLastRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortDescend) } -func NewRange(client *clientv3.Client, key string) *Range { - return NewRangeRev(client, key, 0) -} - -func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range { - return &Range{client.KV, []byte(key), rev, prefixEnd(key)} -} - -// Prefix performs a RangeRequest to get keys matching * -func (r *Range) Prefix() (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{ - Key: prefixNext(string(r.key)), - RangeEnd: r.keyEnd, - Revision: r.rev}) -} - -// OpenInterval gets the keys in the set * - -func (r *Range) OpenInterval() (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev}) -} - -func (r *Range) FirstKey() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY) -} - -func (r *Range) LastKey() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY) -} - -func (r *Range) FirstRev() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) -} - -func (r *Range) LastRev() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) -} - -func (r *Range) FirstCreate() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) -} - -func (r *Range) LastCreate() (*pb.RangeResponse, error) { - return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) -} - -// topTarget gets the first key for a given sort order and target -func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) { - return r.kv.Range( - context.TODO(), - &pb.RangeRequest{ - Key: r.key, - RangeEnd: r.keyEnd, - Limit: 1, - Revision: r.rev, - SortOrder: order, - SortTarget: target}) -} - -// prefixNext returns the first key possibly matched by * - -func prefixNext(prefix string) []byte { - return append([]byte(prefix), 0) -} - -// prefixEnd returns the last key possibly matched by * -func prefixEnd(prefix string) []byte { - keyEnd := []byte(prefix) - for i := len(keyEnd) - 1; i >= 0; i-- { - if keyEnd[i] < 0xff { - keyEnd[i] = keyEnd[i] + 1 - keyEnd = keyEnd[:i+1] - break - } - } - return keyEnd +// withTop gets the first key over the get's prefix given a sort order +func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption { + return []v3.OpOption{ + v3.WithPrefix(), + v3.WithSort(target, order), + v3.WithLimit(1)} } diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index a7b93235c..93aff10cd 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -15,23 +15,26 @@ package recipe import ( - "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/storage/storagepb" ) type RWMutex struct { - client *clientv3.Client - key string - myKey *RemoteKV + client *v3.Client + kv v3.KV + ctx context.Context + + key string + myKey *EphemeralKV } -func NewRWMutex(client *clientv3.Client, key string) *RWMutex { - return &RWMutex{client, key, nil} +func NewRWMutex(client *v3.Client, key string) *RWMutex { + return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil} } func (rwm *RWMutex) RLock() error { - // XXX: make reads ephemeral locks? - rk, err := NewUniqueKey(rwm.client, rwm.key+"/read") + rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read") if err != nil { return err } @@ -39,7 +42,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev() + resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...) if err != nil { return err } @@ -51,21 +54,22 @@ func (rwm *RWMutex) RLock() error { } func (rwm *RWMutex) Lock() error { - rk, err := NewUniqueKey(rwm.client, rwm.key+"/write") + rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write") if err != nil { return err } rwm.myKey = rk for { - // any key of lower rev number blocks the write lock - resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev() + // find any key of lower rev number blocks the write lock + opts := append(withLastRev(), v3.WithRev(rk.Revision()-1)) + resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } if len(resp.Kvs) == 0 { // no matching for revision before myKey; acquired - return nil + break } if err := rwm.waitOnLowest(); err != nil { return err @@ -78,7 +82,8 @@ func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) waitOnLowest() error { // must block; get key before ek for waiting - lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev() + opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) + lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } diff --git a/contrib/recipes/stm.go b/contrib/recipes/stm.go index 6821f454d..03c090b3d 100644 --- a/contrib/recipes/stm.go +++ b/contrib/recipes/stm.go @@ -16,13 +16,13 @@ package recipe import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // STM implements software transactional memory over etcd type STM struct { - client *clientv3.Client + client *v3.Client + kv v3.KV // rset holds the read key's value and revision of read rset map[string]*RemoteKV // wset holds the write key and its value @@ -33,8 +33,8 @@ type STM struct { } // NewSTM creates new transaction loop for a given apply function. -func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error { - s := &STM{client: client, apply: apply} +func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error { + s := &STM{client: client, kv: v3.NewKV(client), apply: apply} errc := make(chan error, 1) go func() { var err error @@ -43,7 +43,8 @@ func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error { if err = apply(s); err != nil || s.aborted { break } - if ok, err := s.commit(); ok || err != nil { + if ok, cerr := s.commit(); ok || cerr != nil { + err = cerr break } } @@ -63,7 +64,7 @@ func (s *STM) Get(key string) (string, error) { if rk, ok := s.rset[key]; ok { return rk.Value(), nil } - rk, err := GetRemoteKV(s.client, key) + rk, err := GetRemoteKV(s.kv, key) if err != nil { return "", err } @@ -76,30 +77,21 @@ func (s *STM) Get(key string) (string, error) { func (s *STM) Put(key string, val string) { s.wset[key] = val } // commit attempts to apply the txn's changes to the server. -func (s *STM) commit() (ok bool, err error) { +func (s *STM) commit() (ok bool, rr error) { // read set must not change - cmps := []*pb.Compare{} + cmps := make([]v3.Cmp, 0, len(s.rset)) for k, rk := range s.rset { // use < to support updating keys that don't exist yet - cmp := &pb.Compare{ - Result: pb.Compare_LESS, - Target: pb.Compare_MOD, - Key: []byte(k), - TargetUnion: &pb.Compare_ModRevision{ModRevision: rk.Revision() + 1}, - } + cmp := v3.Compare(v3.ModifiedRevision(k), "<", rk.Revision()+1) cmps = append(cmps, cmp) } + // apply all writes - puts := []*pb.RequestUnion{} + puts := make([]v3.Op, 0, len(s.wset)) for k, v := range s.wset { - puts = append(puts, &pb.RequestUnion{ - Request: &pb.RequestUnion_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(k), - Value: []byte(v), - }}}) + puts = append(puts, v3.OpPut(k, v)) } - txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil}) + txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit() return txnresp.Succeeded, err } diff --git a/integration/v3_stm_test.go b/integration/v3_stm_test.go index a056fda9e..b496664dc 100644 --- a/integration/v3_stm_test.go +++ b/integration/v3_stm_test.go @@ -19,6 +19,7 @@ import ( "strconv" "testing" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/contrib/recipes" ) @@ -30,7 +31,7 @@ func TestSTMConflict(t *testing.T) { etcdc := clus.RandClient() keys := make([]*recipe.RemoteKV, 5) for i := 0; i < len(keys); i++ { - rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0) + rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0) if err != nil { t.Fatalf("could not make key (%v)", err) } @@ -75,7 +76,7 @@ func TestSTMConflict(t *testing.T) { // ensure sum matches initial sum sum := 0 for _, oldRK := range keys { - rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key()) + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key()) if err != nil { t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err) } @@ -102,7 +103,7 @@ func TestSTMPutNewKey(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(etcdc, "foo") + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) } @@ -128,7 +129,7 @@ func TestSTMAbort(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(etcdc, "foo") + rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) }