From 51e62aa007e4ececea537e77687c60e4d0f5b756 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 26 Jan 2016 17:41:27 -0800 Subject: [PATCH] integration: update gRPC, proto interface --- integration/cluster_test.go | 2 +- integration/v2_http_kv_test.go | 13 +++ integration/v3_barrier_test.go | 3 + integration/v3_grpc_test.go | 171 +++++++++++++++++++++++++-------- integration/v3_queue_test.go | 5 + 5 files changed, 154 insertions(+), 40 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 5f89b0489..0be389194 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -738,7 +738,7 @@ func NewGRPCClient(m *member) (*grpc.ClientConn, error) { return net.Dial("unix", a) } unixdialer := grpc.WithDialer(f) - return grpc.Dial(m.grpcAddr, unixdialer) + return grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) } // Clone returns a member with the same server configuration. The returned diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index e2db2414a..5be17b1a7 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/pkg/testutil" ) func init() { @@ -35,6 +36,7 @@ func init() { } func TestV2Set(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -87,6 +89,7 @@ func TestV2Set(t *testing.T) { } func TestV2CreateUpdate(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -197,6 +200,7 @@ func TestV2CreateUpdate(t *testing.T) { } func TestV2CAS(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -325,6 +329,7 @@ func TestV2CAS(t *testing.T) { } func TestV2Delete(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -424,6 +429,7 @@ func TestV2Delete(t *testing.T) { } func TestV2CAD(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -523,6 +529,7 @@ func TestV2CAD(t *testing.T) { } func TestV2Unique(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -589,6 +596,7 @@ func TestV2Unique(t *testing.T) { } func TestV2Get(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -686,6 +694,7 @@ func TestV2Get(t *testing.T) { } func TestV2QuorumGet(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -783,6 +792,7 @@ func TestV2QuorumGet(t *testing.T) { } func TestV2Watch(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -820,6 +830,7 @@ func TestV2Watch(t *testing.T) { } func TestV2WatchWithIndex(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -886,6 +897,7 @@ func TestV2WatchWithIndex(t *testing.T) { } func TestV2WatchKeyInDir(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) @@ -946,6 +958,7 @@ func TestV2WatchKeyInDir(t *testing.T) { } func TestV2Head(t *testing.T) { + defer testutil.AfterTest(t) cl := NewCluster(t, 1) cl.Launch(t) defer cl.Terminate(t) diff --git a/integration/v3_barrier_test.go b/integration/v3_barrier_test.go index c2c8de0bd..c87e32db7 100644 --- a/integration/v3_barrier_test.go +++ b/integration/v3_barrier_test.go @@ -19,15 +19,18 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/contrib/recipes" + "github.com/coreos/etcd/pkg/testutil" ) func TestBarrierSingleNode(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) testBarrier(t, 5, func() *grpc.ClientConn { return clus.conns[0] }) } func TestBarrierMultiNode(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) testBarrier(t, 5, func() *grpc.ClientConn { return clus.RandConn() }) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 664fea16d..46ba8a50e 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -28,6 +28,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/storagepb" ) @@ -69,6 +70,7 @@ func (c *clusterV3) RandConn() *grpc.ClientConn { // TestV3PutOverwrite puts a key with the v3 api to a random cluster member, // overwrites it, then checks that the change was applied. func TestV3PutOverwrite(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -112,6 +114,7 @@ func TestV3PutOverwrite(t *testing.T) { } func TestV3TxnTooManyOps(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -128,18 +131,22 @@ func TestV3TxnTooManyOps(t *testing.T) { addSuccessOps := func(txn *pb.TxnRequest) { txn.Success = append(txn.Success, &pb.RequestUnion{ - RequestPut: &pb.PutRequest{ - Key: []byte("bar"), - Value: []byte("bar"), + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("bar"), + Value: []byte("bar"), + }, }, }) } addFailureOps := func(txn *pb.TxnRequest) { txn.Failure = append(txn.Failure, &pb.RequestUnion{ - RequestPut: &pb.PutRequest{ - Key: []byte("bar"), - Value: []byte("bar"), + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("bar"), + Value: []byte("bar"), + }, }, }) } @@ -165,6 +172,7 @@ func TestV3TxnTooManyOps(t *testing.T) { // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails. func TestV3PutMissingLease(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -181,7 +189,9 @@ func TestV3PutMissingLease(t *testing.T) { // txn success case func() { txn := &pb.TxnRequest{} - txn.Success = append(txn.Success, &pb.RequestUnion{RequestPut: preq}) + txn.Success = append(txn.Success, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: preq}}) if tresp, err := kvc.Txn(context.TODO(), txn); err == nil { t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp) } @@ -189,7 +199,9 @@ func TestV3PutMissingLease(t *testing.T) { // txn failure case func() { txn := &pb.TxnRequest{} - txn.Failure = append(txn.Failure, &pb.RequestUnion{RequestPut: preq}) + txn.Failure = append(txn.Failure, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: preq}}) cmp := &pb.Compare{ Result: pb.Compare_GREATER, Target: pb.Compare_CREATE, @@ -204,8 +216,12 @@ func TestV3PutMissingLease(t *testing.T) { func() { txn := &pb.TxnRequest{} rreq := &pb.RangeRequest{Key: []byte("bar")} - txn.Success = append(txn.Success, &pb.RequestUnion{RequestRange: rreq}) - txn.Failure = append(txn.Failure, &pb.RequestUnion{RequestPut: preq}) + txn.Success = append(txn.Success, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestRange{ + RequestRange: rreq}}) + txn.Failure = append(txn.Failure, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: preq}}) if tresp, err := kvc.Txn(context.TODO(), txn); err != nil { t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp) } @@ -227,6 +243,7 @@ func TestV3PutMissingLease(t *testing.T) { // TestV3DeleteRange tests various edge cases in the DeleteRange API. func TestV3DeleteRange(t *testing.T) { + defer testutil.AfterTest(t) tests := []struct { keySet []string begin string @@ -318,6 +335,7 @@ func TestV3DeleteRange(t *testing.T) { // TestV3TxnInvaildRange tests txn func TestV3TxnInvaildRange(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -338,17 +356,22 @@ func TestV3TxnInvaildRange(t *testing.T) { // future rev txn := &pb.TxnRequest{} - txn.Success = append(txn.Success, &pb.RequestUnion{RequestPut: preq}) + txn.Success = append(txn.Success, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: preq}}) rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100} - txn.Success = append(txn.Success, &pb.RequestUnion{RequestRange: rreq}) + txn.Success = append(txn.Success, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestRange{ + RequestRange: rreq}}) if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrFutureRev { t.Errorf("err = %v, want %v", err, v3rpc.ErrFutureRev) } // compacted rev - txn.Success[1].RequestRange.Revision = 1 + tv, _ := txn.Success[1].Request.(*pb.RequestUnion_RequestRange) + tv.RequestRange.Revision = 1 if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrCompacted { t.Errorf("err = %v, want %v", err, v3rpc.ErrCompacted) } @@ -356,6 +379,7 @@ func TestV3TxnInvaildRange(t *testing.T) { // TestV3WatchFromCurrentRevision tests Watch APIs from current revision. func TestV3WatchFromCurrentRevision(t *testing.T) { + defer testutil.AfterTest(t) tests := []struct { putKeys []string watchRequest *pb.WatchRequest @@ -365,7 +389,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // watch the key, matching { []string{"foo"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo")}}}, []*pb.WatchResponse{ { @@ -387,7 +413,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // watch the key, non-matching { []string{"foo"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("helloworld")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("helloworld")}}}, []*pb.WatchResponse{ { @@ -399,7 +427,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // watch the prefix, matching { []string{"fooLong"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("foo")}}}, []*pb.WatchResponse{ { @@ -421,7 +451,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // watch the prefix, non-matching { []string{"foo"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("helloworld")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("helloworld")}}}, []*pb.WatchResponse{ { @@ -433,7 +465,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // multiple puts, one watcher with matching key { []string{"foo", "foo", "foo"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo")}}}, []*pb.WatchResponse{ { @@ -475,7 +509,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // multiple puts, one watcher with matching prefix { []string{"foo", "foo", "foo"}, - &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("foo")}}}, []*pb.WatchResponse{ { @@ -520,7 +556,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { clus := newClusterGRPC(t, &clusterConfig{size: 3}) wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, err := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, err := wAPI.Watch(ctx) if err != nil { t.Fatalf("#%d: wAPI.Watch error: %v", i, err) } @@ -580,11 +618,13 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { // TestV3WatchCancelSynced tests Watch APIs cancellation from synced map. func TestV3WatchCancelSynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchCancel(t, 0) } // TestV3WatchCancelUnsynced tests Watch APIs cancellation from unsynced map. func TestV3WatchCancelUnsynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchCancel(t, 1) } @@ -592,12 +632,17 @@ func testV3WatchCancel(t *testing.T, startRev int64) { clus := newClusterGRPC(t, &clusterConfig{size: 3}) wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, errW := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, errW := wAPI.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } - if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: startRev}}); err != nil { + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: startRev}}} + if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) } @@ -609,7 +654,10 @@ func testV3WatchCancel(t *testing.T, startRev int64) { t.Errorf("wresp.Created got = %v, want = true", wresp.Created) } - if err := wStream.Send(&pb.WatchRequest{CancelRequest: &pb.WatchCancelRequest{WatchId: wresp.WatchId}}); err != nil { + creq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{ + WatchId: wresp.WatchId}}} + if err := wStream.Send(creq); err != nil { t.Fatalf("wStream.Send error: %v", err) } @@ -636,10 +684,12 @@ func testV3WatchCancel(t *testing.T, startRev int64) { } func TestV3WatchMultipleWatchersSynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleWatchers(t, 0) } func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleWatchers(t, 1) } @@ -652,7 +702,9 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { wAPI := pb.NewWatchClient(clus.RandConn()) kvc := pb.NewKVClient(clus.RandConn()) - wStream, errW := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, errW := wAPI.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } @@ -661,9 +713,13 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { for i := 0; i < watchKeyN+1; i++ { var wreq *pb.WatchRequest if i < watchKeyN { - wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: startRev}} + wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: startRev}}} } else { - wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("fo"), StartRevision: startRev}} + wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("fo"), StartRevision: startRev}}} } if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) @@ -734,10 +790,12 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { } func TestV3WatchMultipleEventsTxnSynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleEventsTxn(t, 0) } func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleEventsTxn(t, 1) } @@ -746,12 +804,17 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { clus := newClusterGRPC(t, &clusterConfig{size: 3}) wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, wErr := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, wErr := wAPI.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } - if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo"), StartRevision: startRev}}); err != nil { + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("foo"), StartRevision: startRev}}} + if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) } @@ -759,7 +822,9 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { txn := pb.TxnRequest{} for i := 0; i < 3; i++ { ru := &pb.RequestUnion{} - ru.RequestPut = &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")} + ru.Request = &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}} txn.Success = append(txn.Success, ru) } @@ -819,6 +884,7 @@ func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 } func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -832,12 +898,17 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { } wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, wErr := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, wErr := wAPI.Watch(ctx) if wErr != nil { t.Fatalf("wAPI.Watch error: %v", wErr) } - if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo"), StartRevision: 1}}); err != nil { + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Prefix: []byte("foo"), StartRevision: 1}}} + if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) } @@ -893,10 +964,12 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { } func TestV3WatchMultipleStreamsSynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleStreams(t, 0) } func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { + defer testutil.AfterTest(t) testV3WatchMultipleStreams(t, 1) } @@ -908,11 +981,16 @@ func testV3WatchMultipleStreams(t *testing.T, startRev int64) { streams := make([]pb.Watch_WatchClient, 5) for i := range streams { - wStream, errW := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, errW := wAPI.Watch(ctx) if errW != nil { t.Fatalf("wAPI.Watch error: %v", errW) } - if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: startRev}}); err != nil { + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: startRev}}} + if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) } streams[i] = wStream @@ -990,6 +1068,7 @@ func WaitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat } func TestV3RangeRequest(t *testing.T) { + defer testutil.AfterTest(t) tests := []struct { putKeys []string reqs []pb.RangeRequest @@ -1163,6 +1242,7 @@ func TestV3RangeRequest(t *testing.T) { // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. func TestV3LeaseRevoke(t *testing.T) { + defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { lc := pb.NewLeaseClient(clus.RandConn()) _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) @@ -1172,6 +1252,7 @@ func TestV3LeaseRevoke(t *testing.T) { // TestV3LeaseCreateById ensures leases may be created by a given id. func TestV3LeaseCreateByID(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -1212,17 +1293,21 @@ func TestV3LeaseCreateByID(t *testing.T) { // TestV3LeaseExpire ensures a key is deleted once a key expires. func TestV3LeaseExpire(t *testing.T) { + defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { // let lease lapse; wait for deleted key wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, err := wAPI.Watch(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, err := wAPI.Watch(ctx) if err != nil { return err } - creq := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1} - wreq := &pb.WatchRequest{CreateRequest: creq} + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: 1}}} if err := wStream.Send(wreq); err != nil { return err } @@ -1261,10 +1346,13 @@ func TestV3LeaseExpire(t *testing.T) { // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. func TestV3LeaseKeepAlive(t *testing.T) { + defer testutil.AfterTest(t) testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { lc := pb.NewLeaseClient(clus.RandConn()) lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} - lac, err := lc.LeaseKeepAlive(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lac, err := lc.LeaseKeepAlive(ctx) if err != nil { return err } @@ -1292,12 +1380,15 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another // client to confirm it's visible to the whole cluster. func TestV3LeaseExists(t *testing.T) { + defer testutil.AfterTest(t) clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) // create lease + ctx0, cancel0 := context.WithCancel(context.Background()) + defer cancel0() lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate( - context.TODO(), + ctx0, &pb.LeaseCreateRequest{TTL: 30}) if err != nil { t.Fatal(err) @@ -1307,7 +1398,9 @@ func TestV3LeaseExists(t *testing.T) { } // confirm keepalive - lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(context.TODO()) + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(ctx1) if err != nil { t.Fatal(err) } diff --git a/integration/v3_queue_test.go b/integration/v3_queue_test.go index 4b745adbd..ad13ee3fd 100644 --- a/integration/v3_queue_test.go +++ b/integration/v3_queue_test.go @@ -32,7 +32,11 @@ func TestQueueOneReaderOneWriter(t *testing.T) { clus := newClusterGRPC(t, &clusterConfig{size: 1}) defer clus.Terminate(t) + done := make(chan struct{}) go func() { + defer func() { + done <- struct{}{} + }() etcdc := recipe.NewEtcdClient(clus.RandConn()) q := recipe.NewQueue(etcdc, "testq") for i := 0; i < 5; i++ { @@ -53,6 +57,7 @@ func TestQueueOneReaderOneWriter(t *testing.T) { t.Fatalf("expected dequeue value %v, got %v", s, i) } } + <-done } func TestQueueManyReaderOneWriter(t *testing.T) {