From 5f62c05a6df5f65bc3d2bae3c9e420904aafd6d2 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 24 Feb 2016 22:33:28 -0800 Subject: [PATCH] clientv3: compose all clientv3 APIs into client struct --- clientv3/client.go | 35 +++++++++++++++---------------- integration/cluster.go | 21 +++++++++++++++++++ integration/v3_grpc_test.go | 24 +++++++++++----------- integration/v3_lease_test.go | 40 ++++++++++++++++++------------------ integration/v3_watch_test.go | 30 +++++++++++++-------------- 5 files changed, 85 insertions(+), 65 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index ae9d8b4df..d7bc03129 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -24,7 +24,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/transport" ) @@ -34,14 +33,10 @@ var ( // Client provides and manages an etcd v3 client session. type Client struct { - // KV is the keyvalue API for the client's connection. - KV pb.KVClient - // Lease is the lease API for the client's connection. - Lease pb.LeaseClient - // Watch is the watch API for the client's connection. - Watch pb.WatchClient - // Cluster is the cluster API for the client's connection. - Cluster pb.ClusterClient + Cluster + KV + Lease + Watcher conn *grpc.ClientConn cfg Config @@ -86,6 +81,8 @@ func NewFromURL(url string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { + c.Watcher.Close() + c.Lease.Close() return c.conn.Close() } @@ -146,15 +143,17 @@ func newClient(cfg *Config) (*Client, error) { if err != nil { return nil, err } - return &Client{ - KV: pb.NewKVClient(conn), - Lease: pb.NewLeaseClient(conn), - Watch: pb.NewWatchClient(conn), - Cluster: pb.NewClusterClient(conn), - conn: conn, - cfg: *cfg, - creds: creds, - }, nil + client := &Client{ + conn: conn, + cfg: *cfg, + creds: creds, + } + client.Cluster = NewCluster(client) + client.KV = NewKV(client) + client.Lease = NewLease(client) + client.Watcher = NewWatcher(client) + + return client, nil } // ActiveConnection returns the current in-use connection diff --git a/integration/cluster.go b/integration/cluster.go index 6c884032e..7ec023d23 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -37,6 +37,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" @@ -733,3 +734,23 @@ func (c *ClusterV3) RandClient() *clientv3.Client { func (c *ClusterV3) Client(i int) *clientv3.Client { return c.clients[i] } + +type grpcAPI struct { + // Cluster is the cluster API for the client's connection. + Cluster pb.ClusterClient + // KV is the keyvalue API for the client's connection. + KV pb.KVClient + // Lease is the lease API for the client's connection. + Lease pb.LeaseClient + // Watch is the watch API for the client's connection. + Watch pb.WatchClient +} + +func toGRPC(c *clientv3.Client) grpcAPI { + return grpcAPI{ + pb.NewClusterClient(c.ActiveConnection()), + pb.NewKVClient(c.ActiveConnection()), + pb.NewLeaseClient(c.ActiveConnection()), + pb.NewWatchClient(c.ActiveConnection()), + } +} diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 8e150d41e..a05b08e2a 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -33,7 +33,7 @@ func TestV3PutOverwrite(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV key := []byte("foo") reqput := &pb.PutRequest{Key: key, Value: []byte("bar")} @@ -77,7 +77,7 @@ func TestV3TxnTooManyOps(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV // unique keys i := new(int) @@ -161,7 +161,7 @@ func TestV3TxnDuplicateKeys(t *testing.T) { }, } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV tests := []struct { txnSuccess []*pb.RequestUnion @@ -208,7 +208,7 @@ func TestV3PutMissingLease(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV key := []byte("foo") preq := &pb.PutRequest{Key: key, Lease: 123456} tests := []func(){ @@ -324,7 +324,7 @@ func TestV3DeleteRange(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV ks := tt.keySet for j := range ks { @@ -375,7 +375,7 @@ func TestV3TxnInvaildRange(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} for i := 0; i < 3; i++ { @@ -419,7 +419,7 @@ func TestV3TooLargeRequest(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV // 2MB request value largeV := make([]byte, 2*1024*1024) @@ -437,7 +437,7 @@ func TestV3Hash(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} for i := 0; i < 3; i++ { @@ -590,7 +590,7 @@ func TestV3RangeRequest(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) for _, k := range tt.putKeys { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) @@ -598,7 +598,7 @@ func TestV3RangeRequest(t *testing.T) { } for j, req := range tt.reqs { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV resp, err := kvc.Range(context.TODO(), &req) if err != nil { t.Errorf("#%d.%d: Range error: %v", i, j, err) @@ -668,7 +668,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) { donec := make(chan error, 1) go func() { reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} - _, perr := client.KV.Put(ctx, reqput) + _, perr := toGRPC(client).KV.Put(ctx, reqput) donec <- perr }() @@ -717,7 +717,7 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) { defer client.Close() reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} - if _, err := client.KV.Put(context.TODO(), reqput); err != nil { + if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil { t.Fatalf("unexpected error on put over tls (%v)", err) } } diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index e893fb42b..516c28bd4 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -33,7 +33,7 @@ func TestV3LeasePrmote(t *testing.T) { defer clus.Terminate(t) // create lease - lresp, err := clus.RandClient().Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) if err != nil { t.Fatal(err) } @@ -78,7 +78,7 @@ func TestV3LeasePrmote(t *testing.T) { func TestV3LeaseRevoke(t *testing.T) { defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease + lc := toGRPC(clus.RandClient()).Lease _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) return err }) @@ -91,7 +91,7 @@ func TestV3LeaseCreateByID(t *testing.T) { defer clus.Terminate(t) // create fixed lease - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 1, TTL: 1}) if err != nil { @@ -102,7 +102,7 @@ func TestV3LeaseCreateByID(t *testing.T) { } // create duplicate fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( + lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 1, TTL: 1}) if err != v3rpc.ErrLeaseExist { @@ -110,7 +110,7 @@ func TestV3LeaseCreateByID(t *testing.T) { } // create fresh fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( + lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{ID: 2, TTL: 1}) if err != nil { @@ -129,7 +129,7 @@ func TestV3LeaseExpire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wStream, err := clus.RandClient().Watch.Watch(ctx) + wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx) if err != nil { return err } @@ -177,7 +177,7 @@ func TestV3LeaseExpire(t *testing.T) { func TestV3LeaseKeepAlive(t *testing.T) { defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease + lc := toGRPC(clus.RandClient()).Lease lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -215,7 +215,7 @@ func TestV3LeaseExists(t *testing.T) { // create lease ctx0, cancel0 := context.WithCancel(context.Background()) defer cancel0() - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( ctx0, &pb.LeaseCreateRequest{TTL: 30}) if err != nil { @@ -241,34 +241,34 @@ func TestV3LeaseSwitch(t *testing.T) { // create lease ctx, cancel := context.WithCancel(context.Background()) defer cancel() - lresp1, err1 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) + lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) if err1 != nil { t.Fatal(err1) } - lresp2, err2 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) + lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30}) if err2 != nil { t.Fatal(err2) } // attach key on lease1 then switch it to lease2 put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID} - _, err := clus.RandClient().KV.Put(ctx, put1) + _, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1) if err != nil { t.Fatal(err) } put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID} - _, err = clus.RandClient().KV.Put(ctx, put2) + _, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2) if err != nil { t.Fatal(err) } // revoke lease1 should not remove key - _, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID}) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID}) if err != nil { t.Fatal(err) } rreq := &pb.RangeRequest{Key: []byte("foo")} - rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -277,11 +277,11 @@ func TestV3LeaseSwitch(t *testing.T) { } // revoke lease2 should remove key - _, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID}) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID}) if err != nil { t.Fatal(err) } - rresp, err = clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestV3LeaseSwitch(t *testing.T) { // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease - lresp, err := clus.RandClient().Lease.LeaseCreate( + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate( context.TODO(), &pb.LeaseCreateRequest{TTL: 1}) if err != nil { @@ -304,7 +304,7 @@ func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { } // attach to key put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID} - if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil { + if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil { return 0, err } return lresp.ID, nil @@ -327,7 +327,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { // confirm no key rreq := &pb.RangeRequest{Key: []byte("foo")} - rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) + rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq) if err != nil { t.Fatal(err) } @@ -337,7 +337,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { } func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool { - l := clus.RandClient().Lease + l := toGRPC(clus.RandClient()).Lease _, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5}) if err == nil { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 65772b7a6..d36e55703 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -180,7 +180,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { for i, tt := range tests { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - wAPI := clus.RandClient().Watch + wAPI := toGRPC(clus.RandClient()).Watch ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() wStream, err := wAPI.Watch(ctx) @@ -212,7 +212,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // asynchronously create keys go func() { for _, k := range tt.putKeys { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) @@ -273,7 +273,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, errW := clus.RandClient().Watch.Watch(ctx) + wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } @@ -308,7 +308,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) { t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled) } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { t.Errorf("couldn't put key (%v)", err) } @@ -331,7 +331,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -341,7 +341,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { // first revision already allocated as empty revision for i := 1; i < nrRevisions; i++ { go func() { - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} if _, err := kvc.Put(context.TODO(), req); err != nil { t.Fatalf("couldn't put key (%v)", err) @@ -418,11 +418,11 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { // one watcher to test if it receives expected events. func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, errW := clus.RandClient().Watch.Watch(ctx) + wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } @@ -523,7 +523,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -535,7 +535,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { t.Fatalf("wStream.Send error: %v", err) } - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV txn := pb.TxnRequest{} for i := 0; i < 3; i++ { ru := &pb.RequestUnion{} @@ -605,7 +605,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := clus.RandClient().KV + kvc := toGRPC(clus.RandClient()).KV if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil { t.Fatalf("couldn't put key (%v)", err) @@ -616,7 +616,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } @@ -692,8 +692,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. func testV3WatchMultipleStreams(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - wAPI := clus.RandClient().Watch - kvc := clus.RandClient().KV + wAPI := toGRPC(clus.RandClient()).Watch + kvc := toGRPC(clus.RandClient()).KV streams := make([]pb.Watch_WatchClient, 5) for i := range streams { @@ -792,7 +792,7 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - wStream, wErr := clus.RandClient().Watch.Watch(ctx) + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) }