diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index 920a759bd..298ee9428 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -1012,6 +1012,8 @@ func TestV3PutMissingLease(t *testing.T) { func TestV3DeleteRange(t *testing.T) { BeforeTest(t) tests := []struct { + name string + keySet []string begin string end string @@ -1020,44 +1022,44 @@ func TestV3DeleteRange(t *testing.T) { wantSet [][]byte deleted int64 }{ - // delete middle { + "delete middle", []string{"foo", "foo/abc", "fop"}, "foo/", "fop", false, [][]byte{[]byte("foo"), []byte("fop")}, 1, }, - // no delete { + "no delete", []string{"foo", "foo/abc", "fop"}, "foo/", "foo/", false, [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0, }, - // delete first { + "delete first", []string{"foo", "foo/abc", "fop"}, "fo", "fop", false, [][]byte{[]byte("fop")}, 2, }, - // delete tail { + "delete tail", []string{"foo", "foo/abc", "fop"}, "foo/", "fos", false, [][]byte{[]byte("foo")}, 2, }, - // delete exact { + "delete exact", []string{"foo", "foo/abc", "fop"}, "foo/abc", "", false, [][]byte{[]byte("foo"), []byte("fop")}, 1, }, - // delete none, [x,x) { + "delete none [x,x)", []string{"foo"}, "foo", "foo", false, [][]byte{[]byte("foo")}, 0, }, - // delete middle with preserveKVs set { + "delete middle with preserveKVs set", []string{"foo", "foo/abc", "fop"}, "foo/", "fop", true, [][]byte{[]byte("foo"), []byte("fop")}, 1, @@ -1065,55 +1067,56 @@ func TestV3DeleteRange(t *testing.T) { } for i, tt := range tests { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - kvc := toGRPC(clus.RandClient()).KV + t.Run(tt.name, func(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + kvc := toGRPC(clus.RandClient()).KV + defer clus.Terminate(t) - ks := tt.keySet - for j := range ks { - reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}} - _, err := kvc.Put(context.TODO(), reqput) + ks := tt.keySet + for j := range ks { + reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}} + _, err := kvc.Put(context.TODO(), reqput) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + } + + dreq := &pb.DeleteRangeRequest{ + Key: []byte(tt.begin), + RangeEnd: []byte(tt.end), + PrevKv: tt.prevKV, + } + dresp, err := kvc.DeleteRange(context.TODO(), dreq) if err != nil { - t.Fatalf("couldn't put key (%v)", err) + t.Fatalf("couldn't delete range on test %d (%v)", i, err) } - } - - dreq := &pb.DeleteRangeRequest{ - Key: []byte(tt.begin), - RangeEnd: []byte(tt.end), - PrevKv: tt.prevKV, - } - dresp, err := kvc.DeleteRange(context.TODO(), dreq) - if err != nil { - t.Fatalf("couldn't delete range on test %d (%v)", i, err) - } - if tt.deleted != dresp.Deleted { - t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted) - } - if tt.prevKV { - if len(dresp.PrevKvs) != int(dresp.Deleted) { - t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted) + if tt.deleted != dresp.Deleted { + t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted) + } + if tt.prevKV { + if len(dresp.PrevKvs) != int(dresp.Deleted) { + t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted) + } } - } - rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}} - rresp, err := kvc.Range(context.TODO(), rreq) - if err != nil { - t.Errorf("couldn't get range on test %v (%v)", i, err) - } - if dresp.Header.Revision != rresp.Header.Revision { - t.Errorf("expected revision %v, got %v", - dresp.Header.Revision, rresp.Header.Revision) - } + rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}} + rresp, err := kvc.Range(context.TODO(), rreq) + if err != nil { + t.Errorf("couldn't get range on test %v (%v)", i, err) + } + if dresp.Header.Revision != rresp.Header.Revision { + t.Errorf("expected revision %v, got %v", + dresp.Header.Revision, rresp.Header.Revision) + } - keys := [][]byte{} - for j := range rresp.Kvs { - keys = append(keys, rresp.Kvs[j].Key) - } - if !reflect.DeepEqual(tt.wantSet, keys) { - t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys) - } - // can't defer because tcp ports will be in use - clus.Terminate(t) + keys := [][]byte{} + for j := range rresp.Kvs { + keys = append(keys, rresp.Kvs[j].Key) + } + if !reflect.DeepEqual(tt.wantSet, keys) { + t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys) + } + }) } } @@ -1286,6 +1289,8 @@ func TestV3StorageQuotaAPI(t *testing.T) { func TestV3RangeRequest(t *testing.T) { BeforeTest(t) tests := []struct { + name string + putKeys []string reqs []pb.RangeRequest @@ -1293,8 +1298,8 @@ func TestV3RangeRequest(t *testing.T) { wmores []bool wcounts []int64 }{ - // single key { + "single key", []string{"foo", "bar"}, []pb.RangeRequest{ // exists @@ -1310,8 +1315,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{false, false}, []int64{1, 0}, }, - // multi-key { + "multi-key", []string{"a", "b", "c", "d", "e"}, []pb.RangeRequest{ // all in range @@ -1339,8 +1344,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{false, false, false, false, false, false}, []int64{5, 2, 0, 0, 0, 5}, }, - // revision { + "revision", []string{"a", "b", "c", "d", "e"}, []pb.RangeRequest{ {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0}, @@ -1358,8 +1363,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{false, false, false, false}, []int64{5, 0, 1, 2}, }, - // limit { + "limit", []string{"a", "b", "c"}, []pb.RangeRequest{ // more @@ -1381,8 +1386,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{true, true, false, false}, []int64{3, 3, 3, 3}, }, - // sort { + "sort", []string{"b", "a", "c", "d", "c"}, []pb.RangeRequest{ { @@ -1434,8 +1439,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{true, true, true, true, false, false}, []int64{4, 4, 4, 4, 0, 4}, }, - // min/max mod rev { + "min/max mod rev", []string{"rev2", "rev3", "rev4", "rev5", "rev6"}, []pb.RangeRequest{ { @@ -1466,8 +1471,8 @@ func TestV3RangeRequest(t *testing.T) { []bool{false, false, false, false}, []int64{5, 5, 5, 5}, }, - // min/max create rev { + "min/max create rev", []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"}, []pb.RangeRequest{ { @@ -1501,44 +1506,46 @@ func TestV3RangeRequest(t *testing.T) { } for i, tt := range tests { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - for _, k := range tt.putKeys { - kvc := toGRPC(clus.RandClient()).KV - req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} - if _, err := kvc.Put(context.TODO(), req); err != nil { - t.Fatalf("#%d: couldn't put key (%v)", i, err) - } - } - - for j, req := range tt.reqs { - kvc := toGRPC(clus.RandClient()).KV - resp, err := kvc.Range(context.TODO(), &req) - if err != nil { - t.Errorf("#%d.%d: Range error: %v", i, j, err) - continue - } - if len(resp.Kvs) != len(tt.wresps[j]) { - t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j])) - continue - } - for k, wKey := range tt.wresps[j] { - respKey := string(resp.Kvs[k].Key) - if respKey != wKey { - t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey) + t.Run(tt.name, func(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + for _, k := range tt.putKeys { + kvc := toGRPC(clus.RandClient()).KV + req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} + if _, err := kvc.Put(context.TODO(), req); err != nil { + t.Fatalf("#%d: couldn't put key (%v)", i, err) } } - if resp.More != tt.wmores[j] { - t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j]) + + for j, req := range tt.reqs { + kvc := toGRPC(clus.RandClient()).KV + resp, err := kvc.Range(context.TODO(), &req) + if err != nil { + t.Errorf("#%d.%d: Range error: %v", i, j, err) + continue + } + if len(resp.Kvs) != len(tt.wresps[j]) { + t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j])) + continue + } + for k, wKey := range tt.wresps[j] { + respKey := string(resp.Kvs[k].Key) + if respKey != wKey { + t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey) + } + } + if resp.More != tt.wmores[j] { + t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j]) + } + if resp.GetCount() != tt.wcounts[j] { + t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j]) + } + wrev := int64(len(tt.putKeys) + 1) + if resp.Header.Revision != wrev { + t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev) + } } - if resp.GetCount() != tt.wcounts[j] { - t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j]) - } - wrev := int64(len(tt.putKeys) + 1) - if resp.Header.Revision != wrev { - t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev) - } - } - clus.Terminate(t) + }) } } @@ -1925,25 +1932,27 @@ func TestV3LargeRequests(t *testing.T) { {10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge}, } for i, test := range tests { - clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes}) - kvcli := toGRPC(clus.Client(0)).KV - reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)} - _, err := kvcli.Put(context.TODO(), reqput) - if !eqErrGRPC(err, test.expectError) { - t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err) - } - - // request went through, expect large response back from server - if test.expectError == nil { - reqget := &pb.RangeRequest{Key: []byte("foo")} - // limit receive call size with original value + gRPC overhead bytes - _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024)) - if err != nil { - t.Errorf("#%d: range expected no error, got %v", i, err) + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes}) + defer clus.Terminate(t) + kvcli := toGRPC(clus.Client(0)).KV + reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)} + _, err := kvcli.Put(context.TODO(), reqput) + if !eqErrGRPC(err, test.expectError) { + t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err) } - } - clus.Terminate(t) + // request went through, expect large response back from server + if test.expectError == nil { + reqget := &pb.RangeRequest{Key: []byte("foo")} + // limit receive call size with original value + gRPC overhead bytes + _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024)) + if err != nil { + t.Errorf("#%d: range expected no error, got %v", i, err) + } + } + + }) } } diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 240af36f4..b2a31cc2f 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -33,13 +33,15 @@ import ( func TestV3WatchFromCurrentRevision(t *testing.T) { BeforeTest(t) tests := []struct { + name string + putKeys []string watchRequest *pb.WatchRequest wresps []*pb.WatchResponse }{ - // watch the key, matching { + "watch the key, matching", []string{"foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -58,8 +60,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { }, }, }, - // watch the key, non-matching { + "watch the key, non-matching", []string{"foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -67,8 +69,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []*pb.WatchResponse{}, }, - // watch the prefix, matching { + "watch the prefix, matching", []string{"fooLong"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -88,8 +90,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { }, }, }, - // watch the prefix, non-matching { + "watch the prefix, non-matching", []string{"foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -98,8 +100,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []*pb.WatchResponse{}, }, - // watch full range, matching { + "watch full range, matching", []string{"fooLong"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -119,8 +121,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { }, }, }, - // multiple puts, one watcher with matching key { + "multiple puts, one watcher with matching key", []string{"foo", "foo", "foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -159,8 +161,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { }, }, }, - // multiple puts, one watcher with matching prefix { + "multiple puts, one watcher with matching perfix", []string{"foo", "foo", "foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ @@ -203,95 +205,87 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { } for i, tt := range tests { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + t.Run(tt.name, func(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) - wAPI := toGRPC(clus.RandClient()).Watch - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - wStream, err := wAPI.Watch(ctx) - if err != nil { - t.Fatalf("#%d: wAPI.Watch error: %v", i, err) - } + wAPI := toGRPC(clus.RandClient()).Watch + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, err := wAPI.Watch(ctx) + if err != nil { + t.Fatalf("#%d: wAPI.Watch error: %v", i, err) + } - err = wStream.Send(tt.watchRequest) - if err != nil { - t.Fatalf("#%d: wStream.Send error: %v", i, err) - } + err = wStream.Send(tt.watchRequest) + if err != nil { + t.Fatalf("#%d: wStream.Send error: %v", i, err) + } - // ensure watcher request created a new watcher - cresp, err := wStream.Recv() - if err != nil { - t.Errorf("#%d: wStream.Recv error: %v", i, err) - clus.Terminate(t) - continue - } - if !cresp.Created { - t.Errorf("#%d: did not create watchid, got %+v", i, cresp) - clus.Terminate(t) - continue - } - if cresp.Canceled { - t.Errorf("#%d: canceled watcher on create %+v", i, cresp) - clus.Terminate(t) - continue - } + // ensure watcher request created a new watcher + cresp, err := wStream.Recv() + if err != nil { + t.Fatalf("#%d: wStream.Recv error: %v", i, err) + } + if !cresp.Created { + t.Fatalf("#%d: did not create watchid, got %+v", i, cresp) + } + if cresp.Canceled { + t.Fatalf("#%d: canceled watcher on create %+v", i, cresp) + } - createdWatchId := cresp.WatchId - if cresp.Header == nil || cresp.Header.Revision != 1 { - t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp) - clus.Terminate(t) - continue - } + createdWatchId := cresp.WatchId + if cresp.Header == nil || cresp.Header.Revision != 1 { + t.Fatalf("#%d: header revision got +%v, wanted revison 1", i, cresp) + } - // asynchronously create keys - ch := make(chan struct{}, 1) - go func() { - for _, k := range tt.putKeys { - kvc := toGRPC(clus.RandClient()).KV - req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} - if _, err := kvc.Put(context.TODO(), req); err != nil { - t.Errorf("#%d: couldn't put key (%v)", i, err) + // asynchronously create keys + ch := make(chan struct{}, 1) + go func() { + for _, k := range tt.putKeys { + kvc := toGRPC(clus.RandClient()).KV + req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} + if _, err := kvc.Put(context.TODO(), req); err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + ch <- struct{}{} + }() + + // check stream results + for j, wresp := range tt.wresps { + resp, err := wStream.Recv() + if err != nil { + t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err) + } + + if resp.Header == nil { + t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j) + } + if resp.Header.Revision != wresp.Header.Revision { + t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision) + } + + if wresp.Created != resp.Created { + t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created) + } + if resp.WatchId != createdWatchId { + t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId) + } + + if !reflect.DeepEqual(resp.Events, wresp.Events) { + t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events) } } - ch <- struct{}{} - }() - // check stream results - for j, wresp := range tt.wresps { - resp, err := wStream.Recv() - if err != nil { - t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err) + rok, nr := waitResponse(wStream, 1*time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) } - if resp.Header == nil { - t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j) - } - if resp.Header.Revision != wresp.Header.Revision { - t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision) - } - - if wresp.Created != resp.Created { - t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created) - } - if resp.WatchId != createdWatchId { - t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId) - } - - if !reflect.DeepEqual(resp.Events, wresp.Events) { - t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events) - } - } - - rok, nr := waitResponse(wStream, 1*time.Second) - if !rok { - t.Errorf("unexpected pb.WatchResponse is received %+v", nr) - } - - // wait for the client to finish sending the keys before terminating the cluster - <-ch - - // can't defer because tcp ports will be in use - clus.Terminate(t) + // wait for the client to finish sending the keys before terminating the cluster + <-ch + }) } }