diff --git a/clientv3/client.go b/clientv3/client.go index ae9d8b4df..d7bc03129 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -24,7 +24,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/transport" ) @@ -34,14 +33,10 @@ var ( // Client provides and manages an etcd v3 client session. type Client struct { - // KV is the keyvalue API for the client's connection. - KV pb.KVClient - // Lease is the lease API for the client's connection. - Lease pb.LeaseClient - // Watch is the watch API for the client's connection. - Watch pb.WatchClient - // Cluster is the cluster API for the client's connection. - Cluster pb.ClusterClient + Cluster + KV + Lease + Watcher conn *grpc.ClientConn cfg Config @@ -86,6 +81,8 @@ func NewFromURL(url string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { + c.Watcher.Close() + c.Lease.Close() return c.conn.Close() } @@ -146,15 +143,17 @@ func newClient(cfg *Config) (*Client, error) { if err != nil { return nil, err } - return &Client{ - KV: pb.NewKVClient(conn), - Lease: pb.NewLeaseClient(conn), - Watch: pb.NewWatchClient(conn), - Cluster: pb.NewClusterClient(conn), - conn: conn, - cfg: *cfg, - creds: creds, - }, nil + client := &Client{ + conn: conn, + cfg: *cfg, + creds: creds, + } + client.Cluster = NewCluster(client) + client.KV = NewKV(client) + client.Lease = NewLease(client) + client.Watcher = NewWatcher(client) + + return client, nil } // ActiveConnection returns the current in-use connection diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index b741f97b4..e16de51b4 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -40,9 +40,7 @@ func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption } func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { - w := v3.NewWatcher(client) - defer w.Close() - wc := w.Watch(ctx, key, opts...) + wc := client.Watch(ctx, key, opts...) if wc == nil { return ctx.Err() } diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 6c18d99c7..603a3e6db 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -24,7 +24,6 @@ import ( // Mutex implements the sync Locker interface with etcd type Mutex struct { client *v3.Client - kv v3.KV ctx context.Context pfx string @@ -33,7 +32,7 @@ type Mutex struct { } func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex { - return &Mutex{client, v3.NewKV(client), ctx, pfx, "", -1} + return &Mutex{client, ctx, pfx, "", -1} } // Lock locks the mutex with a cancellable context. If the context is cancelled @@ -44,12 +43,12 @@ func (m *Mutex) Lock(ctx context.Context) error { return err } // put self in lock waiters via myKey; oldest waiter holds lock - m.myKey, m.myRev, err = NewUniqueKey(ctx, m.kv, m.pfx, v3.WithLease(s.Lease())) + m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease())) // wait for lock to become available for err == nil { // find oldest element in waiters via revision of insertion var resp *v3.GetResponse - resp, err = m.kv.Get(ctx, m.pfx, v3.WithFirstRev()...) + resp, err = m.client.Get(ctx, m.pfx, v3.WithFirstRev()...) if err != nil { break } @@ -59,7 +58,7 @@ func (m *Mutex) Lock(ctx context.Context) error { } // otherwise myKey isn't lowest, so there must be a pfx prior to myKey opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1)) - resp, err = m.kv.Get(ctx, m.pfx, opts...) + resp, err = m.client.Get(ctx, m.pfx, opts...) if err != nil { break } @@ -80,7 +79,7 @@ func (m *Mutex) Lock(ctx context.Context) error { } func (m *Mutex) Unlock() error { - if _, err := m.kv.Delete(m.ctx, m.myKey); err != nil { + if _, err := m.client.Delete(m.ctx, m.myKey); err != nil { return err } m.myKey = "\x00" diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index 739733547..b1c2d2159 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -49,15 +49,14 @@ func NewSession(client *v3.Client) (*Session, error) { return s, nil } - lc := v3.NewLease(client) - resp, err := lc.Create(context.TODO(), sessionTTL) + resp, err := client.Create(context.TODO(), sessionTTL) if err != nil { return nil, err } id := lease.LeaseID(resp.ID) ctx, cancel := context.WithCancel(context.Background()) - keepAlive, err := lc.KeepAlive(ctx, id) + keepAlive, err := client.KeepAlive(ctx, id) if err != nil || keepAlive == nil { return nil, err } @@ -72,7 +71,6 @@ func NewSession(client *v3.Client) (*Session, error) { clientSessions.mu.Lock() delete(clientSessions.sessions, client) clientSessions.mu.Unlock() - lc.Close() close(donec) }() for range keepAlive { @@ -101,6 +99,6 @@ func (s *Session) Orphan() { // Close orphans the session and revokes the session lease. func (s *Session) Close() error { s.Orphan() - _, err := v3.NewLease(s.client).Revoke(context.TODO(), s.id) + _, err := s.client.Revoke(context.TODO(), s.id) return err } diff --git a/clientv3/example_cluster_test.go b/clientv3/example_cluster_test.go index 04bcf990f..2db168fa7 100644 --- a/clientv3/example_cluster_test.go +++ b/clientv3/example_cluster_test.go @@ -32,9 +32,7 @@ func ExampleCluster_memberList() { } defer cli.Close() - capi := clientv3.NewCluster(cli) - - resp, err := capi.MemberList(context.Background()) + resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } @@ -52,9 +50,7 @@ func ExampleCluster_memberLeader() { } defer cli.Close() - capi := clientv3.NewCluster(cli) - - resp, err := capi.MemberLeader(context.Background()) + resp, err := cli.MemberLeader(context.Background()) if err != nil { log.Fatal(err) } @@ -72,10 +68,8 @@ func ExampleCluster_memberAdd() { } defer cli.Close() - capi := clientv3.NewCluster(cli) - peerURLs := endpoints[2:] - mresp, err := capi.MemberAdd(context.Background(), peerURLs) + mresp, err := cli.MemberAdd(context.Background(), peerURLs) if err != nil { log.Fatal(err) } @@ -93,14 +87,12 @@ func ExampleCluster_memberRemove() { } defer cli.Close() - capi := clientv3.NewCluster(cli) - - resp, err := capi.MemberList(context.Background()) + resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } - _, err = capi.MemberRemove(context.Background(), resp.Members[0].ID) + _, err = cli.MemberRemove(context.Background(), resp.Members[0].ID) if err != nil { log.Fatal(err) } @@ -116,15 +108,13 @@ func ExampleCluster_memberUpdate() { } defer cli.Close() - capi := clientv3.NewCluster(cli) - - resp, err := capi.MemberList(context.Background()) + resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } peerURLs := []string{"http://localhost:12378"} - _, err = capi.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs) + _, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs) if err != nil { log.Fatal(err) } diff --git a/clientv3/example_kv_test.go b/clientv3/example_kv_test.go index 7c57b7bfe..9eb3377e2 100644 --- a/clientv3/example_kv_test.go +++ b/clientv3/example_kv_test.go @@ -32,10 +32,8 @@ func ExampleKV_put() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := kvc.Put(ctx, "sample_key", "sample_value") + resp, err := cli.Put(ctx, "sample_key", "sample_value") cancel() if err != nil { log.Fatal(err) @@ -54,15 +52,13 @@ func ExampleKV_get() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - - _, err = kvc.Put(context.TODO(), "foo", "bar") + _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := kvc.Get(ctx, "foo") + resp, err := cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) @@ -83,11 +79,9 @@ func ExampleKV_getSortedPrefix() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - for i := range make([]int, 3) { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - _, err = kvc.Put(ctx, fmt.Sprintf("key_%d", i), "value") + _, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value") cancel() if err != nil { log.Fatal(err) @@ -95,7 +89,7 @@ func ExampleKV_getSortedPrefix() { } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := kvc.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) cancel() if err != nil { log.Fatal(err) @@ -118,10 +112,8 @@ func ExampleKV_delete() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := kvc.Delete(ctx, "key", clientv3.WithPrefix()) + resp, err := cli.Delete(ctx, "key", clientv3.WithPrefix()) cancel() if err != nil { log.Fatal(err) @@ -140,10 +132,8 @@ func ExampleKV_compact() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := kvc.Get(ctx, "foo") + resp, err := cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) @@ -151,7 +141,7 @@ func ExampleKV_compact() { compRev := resp.Header.Revision // specify compact revision of your choice ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) - err = kvc.Compact(ctx, compRev) + err = cli.Compact(ctx, compRev) cancel() if err != nil { log.Fatal(err) @@ -207,15 +197,13 @@ func ExampleKV_do() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - ops := []clientv3.Op{ clientv3.OpPut("put-key", "123"), clientv3.OpGet("put-key"), clientv3.OpPut("put-key", "456")} for _, op := range ops { - if _, err := kvc.Do(context.TODO(), op); err != nil { + if _, err := cli.Do(context.TODO(), op); err != nil { log.Fatal(err) } } diff --git a/clientv3/example_lease_test.go b/clientv3/example_lease_test.go index 6a4ec1e4a..62c63a46d 100644 --- a/clientv3/example_lease_test.go +++ b/clientv3/example_lease_test.go @@ -33,18 +33,14 @@ func ExampleLease_create() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - lapi := clientv3.NewLease(cli) - defer lapi.Close() - // minimum lease TTL is 5-second - resp, err := lapi.Create(context.TODO(), 5) + resp, err := cli.Create(context.TODO(), 5) if err != nil { log.Fatal(err) } // after 5 seconds, the key 'foo' will be removed - _, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) + _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) if err != nil { log.Fatal(err) } @@ -60,27 +56,23 @@ func ExampleLease_revoke() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - lapi := clientv3.NewLease(cli) - defer lapi.Close() - - resp, err := lapi.Create(context.TODO(), 5) + resp, err := cli.Create(context.TODO(), 5) if err != nil { log.Fatal(err) } - _, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) + _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) if err != nil { log.Fatal(err) } // revoking lease expires the key attached to its lease ID - _, err = lapi.Revoke(context.TODO(), lease.LeaseID(resp.ID)) + _, err = cli.Revoke(context.TODO(), lease.LeaseID(resp.ID)) if err != nil { log.Fatal(err) } - gresp, err := kvc.Get(context.TODO(), "foo") + gresp, err := cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } @@ -98,22 +90,18 @@ func ExampleLease_keepAlive() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - lapi := clientv3.NewLease(cli) - defer lapi.Close() - - resp, err := lapi.Create(context.TODO(), 5) + resp, err := cli.Create(context.TODO(), 5) if err != nil { log.Fatal(err) } - _, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) + _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) if err != nil { log.Fatal(err) } // the key 'foo' will be kept forever - _, err = lapi.KeepAlive(context.TODO(), lease.LeaseID(resp.ID)) + _, err = cli.KeepAlive(context.TODO(), lease.LeaseID(resp.ID)) if err != nil { log.Fatal(err) } @@ -129,22 +117,18 @@ func ExampleLease_keepAliveOnce() { } defer cli.Close() - kvc := clientv3.NewKV(cli) - lapi := clientv3.NewLease(cli) - defer lapi.Close() - - resp, err := lapi.Create(context.TODO(), 5) + resp, err := cli.Create(context.TODO(), 5) if err != nil { log.Fatal(err) } - _, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) + _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) if err != nil { log.Fatal(err) } // to renew the lease only once - _, err = lapi.KeepAliveOnce(context.TODO(), lease.LeaseID(resp.ID)) + _, err = cli.KeepAliveOnce(context.TODO(), lease.LeaseID(resp.ID)) if err != nil { log.Fatal(err) } diff --git a/clientv3/example_test.go b/clientv3/example_test.go index e7eb3b6b7..b1da6f4bf 100644 --- a/clientv3/example_test.go +++ b/clientv3/example_test.go @@ -38,9 +38,7 @@ func Example() { } defer cli.Close() // make sure to close the client - kvc := clientv3.NewKV(cli) - - _, err = kvc.Put(context.TODO(), "foo", "bar") + _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } diff --git a/clientv3/example_watch_test.go b/clientv3/example_watch_test.go index 8c49821ac..4cee69d78 100644 --- a/clientv3/example_watch_test.go +++ b/clientv3/example_watch_test.go @@ -32,10 +32,7 @@ func ExampleWatcher_watch() { } defer cli.Close() - wc := clientv3.NewWatcher(cli) - defer wc.Close() - - rch := wc.Watch(context.Background(), "foo") + rch := cli.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) @@ -54,10 +51,7 @@ func ExampleWatcher_watchPrefix() { } defer cli.Close() - wc := clientv3.NewWatcher(cli) - defer wc.Close() - - rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix()) + rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) diff --git a/clientv3/mirror/syncer.go b/clientv3/mirror/syncer.go index f9faaed3d..4363da522 100644 --- a/clientv3/mirror/syncer.go +++ b/clientv3/mirror/syncer.go @@ -48,10 +48,9 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha respchan := make(chan clientv3.GetResponse, 1024) errchan := make(chan error, 1) - kapi := clientv3.NewKV(s.c) // if rev is not specified, we will choose the most recent revision. if s.rev == 0 { - resp, err := kapi.Get(ctx, "foo") + resp, err := s.c.Get(ctx, "foo") if err != nil { errchan <- err close(respchan) @@ -83,7 +82,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha } for { - resp, err := kapi.Get(ctx, key, opts...) + resp, err := s.c.Get(ctx, key, opts...) if err != nil { errchan <- err return @@ -106,21 +105,5 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { if s.rev == 0 { panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?") } - - respchan := make(chan clientv3.WatchResponse, 1024) - - go func() { - wapi := clientv3.NewWatcher(s.c) - defer wapi.Close() - defer close(respchan) - - // get all events since revision (or get non-compacted revision, if - // rev is too far behind) - wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) - for wr := range wch { - respchan <- wr - } - }() - - return respchan + return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) } diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index 56fde2dfd..b0ffd29e3 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -24,32 +24,31 @@ import ( // release all blocked processes. type Barrier struct { client *v3.Client - kv v3.KV ctx context.Context key string } func NewBarrier(client *v3.Client, key string) *Barrier { - return &Barrier{client, v3.NewKV(client), context.TODO(), key} + return &Barrier{client, context.TODO(), key} } // Hold creates the barrier key causing processes to block on Wait. func (b *Barrier) Hold() error { - _, err := NewKey(b.kv, b.key, 0) + _, err := NewKey(b.client, b.key, 0) return err } // Release deletes the barrier key to unblock all waiting processes. func (b *Barrier) Release() error { - _, err := b.kv.Delete(b.ctx, b.key) + _, err := b.client.Delete(b.ctx, b.key) return err } // Wait blocks on the barrier key until it is deleted. If there is no key, Wait // assumes Release has already been called and returns immediately. func (b *Barrier) Wait() error { - resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...) + resp, err := b.client.Get(b.ctx, b.key, v3.WithFirstKey()...) if err != nil { return err } diff --git a/contrib/recipes/double_barrier.go b/contrib/recipes/double_barrier.go index 1be537d1c..1d0fcacd1 100644 --- a/contrib/recipes/double_barrier.go +++ b/contrib/recipes/double_barrier.go @@ -24,7 +24,6 @@ import ( // blocks again on Leave until all processes have left. type DoubleBarrier struct { client *clientv3.Client - kv clientv3.KV ctx context.Context key string // key for the collective barrier @@ -35,7 +34,6 @@ type DoubleBarrier struct { func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier { return &DoubleBarrier{ client: client, - kv: clientv3.NewKV(client), ctx: context.TODO(), key: key, count: count, @@ -50,7 +48,7 @@ func (b *DoubleBarrier) Enter() error { } b.myKey = ek - resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) + resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if err != nil { return err } @@ -61,7 +59,7 @@ func (b *DoubleBarrier) Enter() error { if len(resp.Kvs) == b.count { // unblock waiters - _, err = b.kv.Put(b.ctx, b.key+"/ready", "") + _, err = b.client.Put(b.ctx, b.key+"/ready", "") return err } @@ -75,7 +73,7 @@ func (b *DoubleBarrier) Enter() error { // Leave waits for "count" processes to leave the barrier then returns func (b *DoubleBarrier) Leave() error { - resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) + resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if len(resp.Kvs) == 0 { return nil } @@ -93,7 +91,7 @@ func (b *DoubleBarrier) Leave() error { if len(resp.Kvs) == 1 { // this is the only node in the barrier; finish up - if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil { + if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil { return err } return b.myKey.Delete() diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go index d091a1975..8c41b610b 100644 --- a/contrib/recipes/election.go +++ b/contrib/recipes/election.go @@ -22,7 +22,6 @@ import ( type Election struct { client *v3.Client - kv v3.KV ctx context.Context keyPrefix string @@ -31,7 +30,7 @@ type Election struct { // NewElection returns a new election on a given key prefix. func NewElection(client *v3.Client, keyPrefix string) *Election { - return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil} + return &Election{client, context.TODO(), keyPrefix, nil} } // Volunteer puts a value as eligible for the election. It blocks until @@ -62,7 +61,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) + resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -74,7 +73,7 @@ func (e *Election) Leader() (string, error) { // Wait waits for a leader to be elected, returning the leader value. func (e *Election) Wait() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) + resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) != 0 { @@ -94,7 +93,7 @@ func (e *Election) Wait() (string, error) { func (e *Election) waitLeadership(tryKey *EphemeralKV) error { opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1)) - resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) + resp, err := e.client.Get(e.ctx, e.keyPrefix, opts...) if err != nil { return err } else if len(resp.Kvs) == 0 { diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index a7011c177..b58d7f2bb 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -166,7 +166,7 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { if err != nil { return nil, err } - k, err := NewKV(v3.NewKV(client), key, val, s.Lease()) + k, err := NewKV(client, key, val, s.Lease()) if err != nil { return nil, err } diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index 1a1c628a1..7cb28c0f5 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -25,20 +25,19 @@ import ( // PriorityQueue implements a multi-reader, multi-writer distributed queue. type PriorityQueue struct { client *v3.Client - kv v3.KV ctx context.Context key string } // NewPriorityQueue creates an etcd priority queue. func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue { - return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"} + return &PriorityQueue{client, context.TODO(), key + "/"} } // Enqueue puts a value into a queue with a given priority. func (q *PriorityQueue) Enqueue(val string, pr uint16) error { prefix := fmt.Sprintf("%s%05d", q.key, pr) - _, err := NewSequentialKV(q.kv, prefix, val) + _, err := NewSequentialKV(q.client, prefix, val) return err } @@ -46,12 +45,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error { // queue is empty, Dequeue blocks until items are available. func (q *PriorityQueue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.key, v3.WithFirstKey()...) + resp, err := q.client.Get(q.ctx, q.key, v3.WithFirstKey()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.kv, resp.Kvs) + kv, err := claimFirstKey(q.client, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -71,7 +70,7 @@ func (q *PriorityQueue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index c0a4977fb..8c9faedf1 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -23,18 +23,17 @@ import ( // Queue implements a multi-reader, multi-writer distributed queue. type Queue struct { client *v3.Client - kv v3.KV ctx context.Context keyPrefix string } func NewQueue(client *v3.Client, keyPrefix string) *Queue { - return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix} + return &Queue{client, context.TODO(), keyPrefix} } func (q *Queue) Enqueue(val string) error { - _, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0) + _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) return err } @@ -42,12 +41,12 @@ func (q *Queue) Enqueue(val string) error { // queue is empty, Dequeue blocks until elements are available. func (q *Queue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) + resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) if err != nil { return "", err } - kv, err := claimFirstKey(q.kv, resp.Kvs) + kv, err := claimFirstKey(q.client, resp.Kvs) if err != nil { return "", err } else if kv != nil { @@ -67,7 +66,7 @@ func (q *Queue) Dequeue() (string, error) { return "", err } - ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) + ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision) if err != nil { return "", err } else if !ok { diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index ea2425c69..921976610 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -22,7 +22,6 @@ import ( type RWMutex struct { client *v3.Client - kv v3.KV ctx context.Context key string @@ -30,7 +29,7 @@ type RWMutex struct { } func NewRWMutex(client *v3.Client, key string) *RWMutex { - return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil} + return &RWMutex{client, context.TODO(), key, nil} } func (rwm *RWMutex) RLock() error { @@ -42,7 +41,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) + resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) if err != nil { return err } @@ -63,7 +62,7 @@ func (rwm *RWMutex) Lock() error { for { // find any key of lower rev number blocks the write lock opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) - resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) + resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } @@ -83,7 +82,7 @@ func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) waitOnLowest() error { // must block; get key before ek for waiting opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) - lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) + lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } diff --git a/contrib/recipes/stm.go b/contrib/recipes/stm.go index 03c090b3d..8c271944c 100644 --- a/contrib/recipes/stm.go +++ b/contrib/recipes/stm.go @@ -22,7 +22,6 @@ import ( // STM implements software transactional memory over etcd type STM struct { client *v3.Client - kv v3.KV // rset holds the read key's value and revision of read rset map[string]*RemoteKV // wset holds the write key and its value @@ -34,7 +33,7 @@ type STM struct { // NewSTM creates new transaction loop for a given apply function. func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error { - s := &STM{client: client, kv: v3.NewKV(client), apply: apply} + s := &STM{client: client, apply: apply} errc := make(chan error, 1) go func() { var err error @@ -64,7 +63,7 @@ func (s *STM) Get(key string) (string, error) { if rk, ok := s.rset[key]; ok { return rk.Value(), nil } - rk, err := GetRemoteKV(s.kv, key) + rk, err := GetRemoteKV(s.client, key) if err != nil { return "", err } @@ -91,7 +90,7 @@ func (s *STM) commit() (ok bool, rr error) { for k, v := range s.wset { puts = append(puts, v3.OpPut(k, v)) } - txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit() + txnresp, err := s.client.Txn(context.TODO()).If(cmps...).Then(puts...).Commit() return txnresp.Succeeded, err } diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index 508582d6f..9fe70c795 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -22,23 +22,19 @@ import ( // WaitEvents waits on a key until it observes the given events and returns the final one. func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w := clientv3.NewWatcher(c) - wc := w.Watch(context.Background(), key, clientv3.WithRev(rev)) + wc := c.Watch(context.Background(), key, clientv3.WithRev(rev)) if wc == nil { - w.Close() return nil, ErrNoWatcher } - return waitEvents(wc, evs), w.Close() + return waitEvents(wc, evs), nil } func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w := clientv3.NewWatcher(c) - wc := w.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) + wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) if wc == nil { - w.Close() return nil, ErrNoWatcher } - return waitEvents(wc, evs), w.Close() + return waitEvents(wc, evs), nil } func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event { diff --git a/etcdctlv3/command/compaction_command.go b/etcdctlv3/command/compaction_command.go index d68f3595b..24b05da8b 100644 --- a/etcdctlv3/command/compaction_command.go +++ b/etcdctlv3/command/compaction_command.go @@ -20,7 +20,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" ) // NewCompactionCommand returns the cobra command for "compaction". @@ -44,7 +43,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) { } c := mustClientFromCmd(cmd) - if cerr := clientv3.NewKV(c).Compact(context.TODO(), rev); cerr != nil { + if cerr := c.Compact(context.TODO(), rev); cerr != nil { ExitWithError(ExitError, cerr) return } diff --git a/etcdctlv3/command/del_command.go b/etcdctlv3/command/del_command.go index cdabf8dd7..bc094367a 100644 --- a/etcdctlv3/command/del_command.go +++ b/etcdctlv3/command/del_command.go @@ -34,9 +34,7 @@ func NewDelCommand() *cobra.Command { // delCommandFunc executes the "del" command. func delCommandFunc(cmd *cobra.Command, args []string) { key, opts := getDelOp(cmd, args) - c := mustClientFromCmd(cmd) - kvapi := clientv3.NewKV(c) - resp, err := kvapi.Delete(context.TODO(), key, opts...) + resp, err := mustClientFromCmd(cmd).Delete(context.TODO(), key, opts...) if err != nil { ExitWithError(ExitError, err) } diff --git a/etcdctlv3/command/get_command.go b/etcdctlv3/command/get_command.go index 43a5f167b..9c1650cc8 100644 --- a/etcdctlv3/command/get_command.go +++ b/etcdctlv3/command/get_command.go @@ -49,9 +49,7 @@ func NewGetCommand() *cobra.Command { // getCommandFunc executes the "get" command. func getCommandFunc(cmd *cobra.Command, args []string) { key, opts := getGetOp(cmd, args) - c := mustClientFromCmd(cmd) - kvapi := clientv3.NewKV(c) - resp, err := kvapi.Get(context.TODO(), key, opts...) + resp, err := mustClientFromCmd(cmd).Get(context.TODO(), key, opts...) if err != nil { ExitWithError(ExitError, err) } diff --git a/etcdctlv3/command/lease_command.go b/etcdctlv3/command/lease_command.go index 97e861f73..65dd5f5ee 100644 --- a/etcdctlv3/command/lease_command.go +++ b/etcdctlv3/command/lease_command.go @@ -21,7 +21,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/lease" ) @@ -62,9 +61,7 @@ func leaseCreateCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err)) } - c := mustClientFromCmd(cmd) - l := clientv3.NewLease(c) - resp, err := l.Create(context.TODO(), ttl) + resp, err := mustClientFromCmd(cmd).Create(context.TODO(), ttl) if err != nil { fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err) return @@ -95,9 +92,7 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err)) } - c := mustClientFromCmd(cmd) - l := clientv3.NewLease(c) - _, err = l.Revoke(context.TODO(), lease.LeaseID(id)) + _, err = mustClientFromCmd(cmd).Revoke(context.TODO(), lease.LeaseID(id)) if err != nil { fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err) return @@ -128,9 +123,7 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err)) } - c := mustClientFromCmd(cmd) - l := clientv3.NewLease(c) - respc, kerr := l.KeepAlive(context.TODO(), lease.LeaseID(id)) + respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), lease.LeaseID(id)) if kerr != nil { ExitWithError(ExitBadConnection, kerr) } diff --git a/etcdctlv3/command/lock_command.go b/etcdctlv3/command/lock_command.go index b68841422..a07f2a773 100644 --- a/etcdctlv3/command/lock_command.go +++ b/etcdctlv3/command/lock_command.go @@ -68,7 +68,7 @@ func lockUntilSignal(c *clientv3.Client, lockname string) error { return err } - k, kerr := clientv3.NewKV(c).Get(ctx, m.Key()) + k, kerr := c.Get(ctx, m.Key()) if kerr != nil { return kerr } diff --git a/etcdctlv3/command/make_mirror_command.go b/etcdctlv3/command/make_mirror_command.go index abef26ecf..19ceb563a 100644 --- a/etcdctlv3/command/make_mirror_command.go +++ b/etcdctlv3/command/make_mirror_command.go @@ -75,15 +75,13 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er }() // TODO: remove the prefix of the destination cluster? - dkv := clientv3.NewKV(dc) - s := mirror.NewSyncer(c, mmprefix, 0) rc, errc := s.SyncBase(ctx) for r := range rc { for _, kv := range r.Kvs { - _, err := dkv.Put(ctx, string(kv.Key), string(kv.Value)) + _, err := dc.Put(ctx, string(kv.Key), string(kv.Value)) if err != nil { return err } @@ -109,7 +107,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er for _, ev := range wr.Events { nrev := ev.Kv.ModRevision if rev != 0 && nrev > rev { - _, err := dkv.Txn(ctx).Then(ops...).Commit() + _, err := dc.Txn(ctx).Then(ops...).Commit() if err != nil { return err } @@ -128,7 +126,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er } if len(ops) != 0 { - _, err := dkv.Txn(ctx).Then(ops...).Commit() + _, err := dc.Txn(ctx).Then(ops...).Commit() if err != nil { return err } diff --git a/etcdctlv3/command/member_command.go b/etcdctlv3/command/member_command.go index 5540e67a2..2148b4aba 100644 --- a/etcdctlv3/command/member_command.go +++ b/etcdctlv3/command/member_command.go @@ -21,7 +21,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) var ( @@ -108,8 +107,7 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) { urls := strings.Split(memberPeerURLs, ",") - req := &pb.MemberAddRequest{PeerURLs: urls} - resp, err := mustClientFromCmd(cmd).Cluster.MemberAdd(context.TODO(), req) + resp, err := mustClientFromCmd(cmd).MemberAdd(context.TODO(), urls) if err != nil { ExitWithError(ExitError, err) } @@ -128,8 +126,7 @@ func memberRemoveCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err)) } - req := &pb.MemberRemoveRequest{ID: uint64(id)} - resp, err := mustClientFromCmd(cmd).Cluster.MemberRemove(context.TODO(), req) + resp, err := mustClientFromCmd(cmd).MemberRemove(context.TODO(), id) if err != nil { ExitWithError(ExitError, err) } @@ -154,8 +151,7 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) { urls := strings.Split(memberPeerURLs, ",") - req := &pb.MemberUpdateRequest{ID: uint64(id), PeerURLs: urls} - resp, err := mustClientFromCmd(cmd).Cluster.MemberUpdate(context.TODO(), req) + resp, err := mustClientFromCmd(cmd).MemberUpdate(context.TODO(), id, urls) if err != nil { ExitWithError(ExitError, err) } @@ -165,7 +161,7 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) { // memberListCommandFunc executes the "member list" command. func memberListCommandFunc(cmd *cobra.Command, args []string) { - resp, err := mustClientFromCmd(cmd).Cluster.MemberList(context.TODO(), &pb.MemberListRequest{}) + resp, err := mustClientFromCmd(cmd).MemberList(context.TODO()) if err != nil { ExitWithError(ExitError, err) } diff --git a/etcdctlv3/command/put_command.go b/etcdctlv3/command/put_command.go index 709675937..eb9e5ca77 100644 --- a/etcdctlv3/command/put_command.go +++ b/etcdctlv3/command/put_command.go @@ -58,9 +58,7 @@ will store the content of the file to . func putCommandFunc(cmd *cobra.Command, args []string) { key, value, opts := getPutOp(cmd, args) - c := mustClientFromCmd(cmd) - kvapi := clientv3.NewKV(c) - resp, err := kvapi.Put(context.TODO(), key, value, opts...) + resp, err := mustClientFromCmd(cmd).Put(context.TODO(), key, value, opts...) if err != nil { ExitWithError(ExitError, err) } diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 0dcc868cb..7931d8adb 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -52,9 +52,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) { // snapshotToStdout streams a snapshot over stdout func snapshotToStdout(c *clientv3.Client) { // must explicitly fetch first revision since no retry on stdout - wapi := clientv3.NewWatcher(c) - defer wapi.Close() - wr := <-wapi.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) + wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) if len(wr.Events) > 0 { wr.CompactRevision = 1 } diff --git a/etcdctlv3/command/txn_command.go b/etcdctlv3/command/txn_command.go index 13950166b..d0ae629cf 100644 --- a/etcdctlv3/command/txn_command.go +++ b/etcdctlv3/command/txn_command.go @@ -53,7 +53,7 @@ func txnCommandFunc(cmd *cobra.Command, args []string) { reader := bufio.NewReader(os.Stdin) - txn := clientv3.NewKV(mustClientFromCmd(cmd)).Txn(context.Background()) + txn := mustClientFromCmd(cmd).Txn(context.Background()) fmt.Println("compares:") txn.If(readCompares(reader)...) fmt.Println("success requests (get, put, delete):") diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index 332345008..ba8b7c419 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -57,16 +57,14 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires an argument as key or prefix")) } - c := mustClientFromCmd(cmd) - w := clientv3.NewWatcher(c) - opts := []clientv3.OpOption{clientv3.WithRev(watchRev)} if watchPrefix { opts = append(opts, clientv3.WithPrefix()) } - wc := w.Watch(context.TODO(), args[0], opts...) + c := mustClientFromCmd(cmd) + wc := c.Watch(context.TODO(), args[0], opts...) printWatchCh(wc) - err := w.Close() + err := c.Close() if err == nil { ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server")) } @@ -75,7 +73,6 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { func watchInteractiveFunc(cmd *cobra.Command, args []string) { c := mustClientFromCmd(cmd) - w := clientv3.NewWatcher(c) reader := bufio.NewReader(os.Stdin) @@ -117,7 +114,7 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { if watchPrefix { opts = append(opts, clientv3.WithPrefix()) } - ch := w.Watch(context.TODO(), key, opts...) + ch := c.Watch(context.TODO(), key, opts...) go printWatchCh(ch) } } diff --git a/integration/cluster.go b/integration/cluster.go index 6c884032e..7ec023d23 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -37,6 +37,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" @@ -733,3 +734,23 @@ func (c *ClusterV3) RandClient() *clientv3.Client { func (c *ClusterV3) Client(i int) *clientv3.Client { return c.clients[i] } + +type grpcAPI struct { + // Cluster is the cluster API for the client's connection. + Cluster pb.ClusterClient + // KV is the keyvalue API for the client's connection. + KV pb.KVClient + // Lease is the lease API for the client's connection. + Lease pb.LeaseClient + // Watch is the watch API for the client's connection. + Watch pb.WatchClient +} + +func toGRPC(c *clientv3.Client) grpcAPI { + return grpcAPI{ + pb.NewClusterClient(c.ActiveConnection()), + pb.NewKVClient(c.ActiveConnection()), + pb.NewLeaseClient(c.ActiveConnection()), + pb.NewWatchClient(c.ActiveConnection()), + } +} diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 8e150d41e..a05b08e2a 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -33,7 +33,7 @@ func TestV3PutOverwrite(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV key := []byte("foo") reqput := &pb.PutRequest{Key: key, Value: []byte("bar")} @@ -77,7 +77,7 @@ func TestV3TxnTooManyOps(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV // unique keys i := new(int) @@ -161,7 +161,7 @@ func TestV3TxnDuplicateKeys(t *testing.T) { }, } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV tests := []struct { txnSuccess []*pb.RequestUnion @@ -208,7 +208,7 @@ func TestV3PutMissingLease(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV key := []byte("foo") preq := &pb.PutRequest{Key: key, Lease: 123456} tests := []func(){ @@ -324,7 +324,7 @@ func TestV3DeleteRange(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV ks := tt.keySet for j := range ks { @@ -375,7 +375,7 @@ func TestV3TxnInvaildRange(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} for i := 0; i < 3; i++ { @@ -419,7 +419,7 @@ func TestV3TooLargeRequest(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV // 2MB request value largeV := make([]byte, 2*1024*1024) @@ -437,7 +437,7 @@ func TestV3Hash(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} for i := 0; i < 3; i++ { @@ -590,7 +590,7 @@ func TestV3RangeRequest(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) for _, k := range tt.putKeys { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) @@ -598,7 +598,7 @@ func TestV3RangeRequest(t *testing.T) { } for j, req := range tt.reqs { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV resp, err := kvc.Range(context.TODO(), &req) if err != nil { t.Errorf("#%d.%d: Range error: %v", i, j, err) @@ -668,7 +668,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) { donec := make(chan error, 1) go func() { reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} - _, perr := client.KV.Put(ctx, reqput) + _, perr := toGRPC(client).KV.Put(ctx, reqput) donec <- perr }() @@ -717,7 +717,7 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) { defer client.Close() reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} - if _, err := client.KV.Put(context.TODO(), reqput); err != nil { + if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil { t.Fatalf("unexpected error on put over tls (%v)", err) } } diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index e893fb42b..516c28bd4 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -33,7 +33,7 @@ func TestV3LeasePrmote(t *testing.T) { defer clus.Terminate(t) // create lease - lresp, err := clus.RandClient().Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) if err != nil { t.Fatal(err) } @@ -78,7 +78,7 @@ func TestV3LeasePrmote(t *testing.T) { func TestV3LeaseRevoke(t *testing.T) { defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease + lc := toGRPC(clus.RandClient()).Lease _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) return err }) @@ -91,7 +91,7 @@ func TestV3LeaseCreateByID(t *testing.T) { defer clus.Terminate(t) // create fixed lease - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 1, TTL: 1}) if err != nil { @@ -102,7 +102,7 @@ func TestV3LeaseCreateByID(t *testing.T) { } // create duplicate fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( + lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 1, TTL: 1}) if err != v3rpc.ErrLeaseExist { @@ -110,7 +110,7 @@ func TestV3LeaseCreateByID(t *testing.T) { } // create fresh fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( + lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 2, TTL: 1}) if err != nil { @@ -129,7 +129,7 @@ func TestV3LeaseExpire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wStream, err := clus.RandClient().Watch.Watch(ctx) + wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx) if err != nil { return err } @@ -177,7 +177,7 @@ func TestV3LeaseExpire(t *testing.T) { func TestV3LeaseKeepAlive(t *testing.T) { defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease + lc := toGRPC(clus.RandClient()).Lease lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -215,7 +215,7 @@ func TestV3LeaseExists(t *testing.T) { // create lease ctx0, cancel0 := context.WithCancel(context.Background()) defer cancel0() - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( ctx0, &pb.LeaseCreateRequest{TTL: 30}) if err != nil { @@ -241,34 +241,34 @@ func TestV3LeaseSwitch(t *testing.T) { // create lease ctx, cancel := context.WithCancel(context.Background()) defer cancel() - lresp1, err1 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) + lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) if err1 != nil { t.Fatal(err1) } - lresp2, err2 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) + lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) if err2 != nil { t.Fatal(err2) } // attach key on lease1 then switch it to lease2 put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID} - _, err := clus.RandClient().KV.Put(ctx, put1) + _, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1) if err != nil { t.Fatal(err) } put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID} - _, err = clus.RandClient().KV.Put(ctx, put2) + _, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2) if err != nil { t.Fatal(err) } // revoke lease1 should not remove key - _, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID}) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID}) if err != nil { t.Fatal(err) } rreq := &pb.RangeRequest{Key: []byte("foo")} - rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -277,11 +277,11 @@ func TestV3LeaseSwitch(t *testing.T) { } // revoke lease2 should remove key - _, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID}) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID}) if err != nil { t.Fatal(err) } - rresp, err = clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestV3LeaseSwitch(t *testing.T) { // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{TTL: 1}) if err != nil { @@ -304,7 +304,7 @@ func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { } // attach to key put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID} - if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil { + if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil { return 0, err } return lresp.ID, nil @@ -327,7 +327,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { // confirm no key rreq := &pb.RangeRequest{Key: []byte("foo")} - rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -337,7 +337,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { } func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool { - l := clus.RandClient().Lease + l := toGRPC(clus.RandClient()).Lease _, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5}) if err == nil { diff --git a/integration/v3_stm_test.go b/integration/v3_stm_test.go index b496664dc..a056fda9e 100644 --- a/integration/v3_stm_test.go +++ b/integration/v3_stm_test.go @@ -19,7 +19,6 @@ import ( "strconv" "testing" - v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/contrib/recipes" ) @@ -31,7 +30,7 @@ func TestSTMConflict(t *testing.T) { etcdc := clus.RandClient() keys := make([]*recipe.RemoteKV, 5) for i := 0; i < len(keys); i++ { - rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0) + rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0) if err != nil { t.Fatalf("could not make key (%v)", err) } @@ -76,7 +75,7 @@ func TestSTMConflict(t *testing.T) { // ensure sum matches initial sum sum := 0 for _, oldRK := range keys { - rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key()) + rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key()) if err != nil { t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err) } @@ -103,7 +102,7 @@ func TestSTMPutNewKey(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") + rk, err := recipe.GetRemoteKV(etcdc, "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) } @@ -129,7 +128,7 @@ func TestSTMAbort(t *testing.T) { t.Fatalf("error on stm txn (%v)", err) } - rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") + rk, err := recipe.GetRemoteKV(etcdc, "foo") if err != nil { t.Fatalf("error fetching key (%v)", err) } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 17513ca7e..7ca7b830b 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -180,7 +180,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - wAPI := clus.RandClient().Watch + wAPI := toGRPC(clus.RandClient()).Watch ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() wStream, err := wAPI.Watch(ctx) @@ -212,7 +212,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // asynchronously create keys go func() { for _, k := range tt.putKeys { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) @@ -273,7 +273,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, errW := clus.RandClient().Watch.Watch(ctx) + wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } @@ -308,7 +308,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) { t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled) } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { t.Errorf("couldn't put key (%v)", err) } @@ -331,7 +331,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -341,7 +341,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { // first revision already allocated as empty revision for i := 1; i < nrRevisions; i++ { go func() { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("couldn't put key (%v)", err) @@ -418,11 +418,11 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { // one watcher to test if it receives expected events. func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, errW := clus.RandClient().Watch.Watch(ctx) + wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } @@ -523,7 +523,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -535,7 +535,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { t.Fatalf("wStream.Send error: %v", err) } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV txn := pb.TxnRequest{} for i := 0; i < 3; i++ { ru := &pb.RequestUnion{} @@ -605,7 +605,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil { t.Fatalf("couldn't put key (%v)", err) @@ -616,7 +616,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -692,8 +692,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. func testV3WatchMultipleStreams(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - wAPI := clus.RandClient().Watch - kvc := clus.RandClient().KV + wAPI := toGRPC(clus.RandClient()).Watch + kvc := toGRPC(clus.RandClient()).KV streams := make([]pb.Watch_WatchClient, 5) for i := range streams { @@ -792,7 +792,7 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index aa0904e44..85c8792c7 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -24,7 +24,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // putCmd represents the put command @@ -61,10 +61,10 @@ func putFunc(cmd *cobra.Command, args []string) { } results = make(chan result) - requests := make(chan etcdserverpb.PutRequest, totalClients) + requests := make(chan v3.Op, totalClients) bar = pb.New(putTotal) - k, v := make([]byte, keySize), mustRandBytes(valSize) + k, v := make([]byte, keySize), string(mustRandBytes(valSize)) clients := mustCreateClients(totalClients, totalConns) @@ -73,7 +73,7 @@ func putFunc(cmd *cobra.Command, args []string) { for i := range clients { wg.Add(1) - go doPut(context.Background(), clients[i].KV, requests) + go doPut(context.Background(), clients[i], requests) } pdoneC := printReport(results) @@ -85,7 +85,7 @@ func putFunc(cmd *cobra.Command, args []string) { } else { binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) } - requests <- etcdserverpb.PutRequest{Key: k, Value: v} + requests <- v3.OpPut(string(k), v) } close(requests) }() @@ -98,12 +98,12 @@ func putFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) { +func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) { defer wg.Done() - for r := range requests { + for op := range requests { st := time.Now() - _, err := client.Put(ctx, &r) + _, err := client.Do(ctx, op) var errStr string if err != nil { diff --git a/tools/benchmark/cmd/range.go b/tools/benchmark/cmd/range.go index da04720dd..3471e2f43 100644 --- a/tools/benchmark/cmd/range.go +++ b/tools/benchmark/cmd/range.go @@ -22,7 +22,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // rangeCmd represents the range command @@ -50,10 +50,10 @@ func rangeFunc(cmd *cobra.Command, args []string) { os.Exit(1) } - k := []byte(args[0]) - var end []byte + k := args[0] + end := "" if len(args) == 2 { - end = []byte(args[1]) + end = args[1] } if rangeConsistency == "l" { @@ -66,7 +66,7 @@ func rangeFunc(cmd *cobra.Command, args []string) { } results = make(chan result) - requests := make(chan etcdserverpb.RangeRequest, totalClients) + requests := make(chan v3.Op, totalClients) bar = pb.New(rangeTotal) clients := mustCreateClients(totalClients, totalConns) @@ -83,11 +83,12 @@ func rangeFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < rangeTotal; i++ { - r := etcdserverpb.RangeRequest{Key: k, RangeEnd: end} + opts := []v3.OpOption{v3.WithRange(end)} if rangeConsistency == "s" { - r.Serializable = true + opts = append(opts, v3.WithSerializable()) } - requests <- r + op := v3.OpGet(k, opts...) + requests <- op } close(requests) }() @@ -100,12 +101,12 @@ func rangeFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) { +func doRange(client v3.KV, requests <-chan v3.Op) { defer wg.Done() - for req := range requests { + for op := range requests { st := time.Now() - _, err := client.Range(context.Background(), &req) + _, err := client.Do(context.Background(), op) var errStr string if err != nil { diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 1a73f33c0..d80a6d4b7 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "time" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" @@ -73,23 +73,18 @@ func init() { } func watchFunc(cmd *cobra.Command, args []string) { - watched := make([][]byte, watchedKeyTotal) + watched := make([]string, watchedKeyTotal) for i := range watched { - watched[i] = mustRandBytes(32) + watched[i] = string(mustRandBytes(32)) } - requests := make(chan etcdserverpb.WatchRequest, totalClients) + requests := make(chan string, totalClients) clients := mustCreateClients(totalClients, totalConns) - streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams) - var err error + streams := make([]v3.Watcher, watchTotalStreams) for i := range streams { - streams[i], err = clients[i%len(clients)].Watch.Watch(context.TODO()) - if err != nil { - fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err) - os.Exit(1) - } + streams[i] = v3.NewWatcher(clients[i%len(clients)]) } putStartNotifier = make(chan struct{}) @@ -111,10 +106,7 @@ func watchFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < watchTotal; i++ { - requests <- etcdserverpb.WatchRequest{ - RequestUnion: &etcdserverpb.WatchRequest_CreateRequest{ - CreateRequest: &etcdserverpb.WatchCreateRequest{ - Key: watched[i%(len(watched))]}}} + requests <- watched[i%len(watched)] } close(requests) }() @@ -139,7 +131,7 @@ func watchFunc(cmd *cobra.Command, args []string) { recvCompletedNotifier = make(chan struct{}) close(putStartNotifier) - putreqc := make(chan etcdserverpb.PutRequest) + putreqc := make(chan v3.Op) for i := 0; i < watchPutTotal; i++ { go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc) @@ -149,10 +141,7 @@ func watchFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < eventsTotal; i++ { - putreqc <- etcdserverpb.PutRequest{ - Key: watched[i%(len(watched))], - Value: []byte("data"), - } + putreqc <- v3.OpPut(watched[i%(len(watched))], "data") // TODO: use a real rate-limiter instead of sleep. time.Sleep(time.Second / time.Duration(watchPutRate)) } @@ -166,16 +155,17 @@ func watchFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) { +func doWatch(stream v3.Watcher, requests <-chan string) { for r := range requests { st := time.Now() - err := stream.Send(&r) + wch := stream.Watch(context.TODO(), r) var errStr string - if err != nil { - errStr = err.Error() + if wch == nil { + errStr = "could not open watch channel" } results <- result{errStr: errStr, duration: time.Since(st)} bar.Increment() + go recvWatchChan(wch) } atomic.AddInt32(&nrWatchCompleted, 1) if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) { @@ -183,15 +173,12 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb } <-putStartNotifier +} - for { +func recvWatchChan(wch v3.WatchChan) { + for range wch { st := time.Now() - _, err := stream.Recv() - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st)} + results <- result{duration: time.Since(st)} bar.Increment() atomic.AddInt32(&nrRecvCompleted, 1) @@ -201,11 +188,11 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb } } -func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) { - for r := range requests { - _, err := client.Put(ctx, &r) +func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) { + for op := range requests { + _, err := client.Do(ctx, op) if err != nil { - fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err) + fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err) os.Exit(1) } }