mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13063 from serathius/integration-defer
integration: Use subtests to defer cluster.Terminate call
This commit is contained in:
commit
4c6c506575
@ -1012,6 +1012,8 @@ func TestV3PutMissingLease(t *testing.T) {
|
||||
func TestV3DeleteRange(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
keySet []string
|
||||
begin string
|
||||
end string
|
||||
@ -1020,44 +1022,44 @@ func TestV3DeleteRange(t *testing.T) {
|
||||
wantSet [][]byte
|
||||
deleted int64
|
||||
}{
|
||||
// delete middle
|
||||
{
|
||||
"delete middle",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"foo/", "fop", false,
|
||||
[][]byte{[]byte("foo"), []byte("fop")}, 1,
|
||||
},
|
||||
// no delete
|
||||
{
|
||||
"no delete",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"foo/", "foo/", false,
|
||||
[][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
|
||||
},
|
||||
// delete first
|
||||
{
|
||||
"delete first",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"fo", "fop", false,
|
||||
[][]byte{[]byte("fop")}, 2,
|
||||
},
|
||||
// delete tail
|
||||
{
|
||||
"delete tail",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"foo/", "fos", false,
|
||||
[][]byte{[]byte("foo")}, 2,
|
||||
},
|
||||
// delete exact
|
||||
{
|
||||
"delete exact",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"foo/abc", "", false,
|
||||
[][]byte{[]byte("foo"), []byte("fop")}, 1,
|
||||
},
|
||||
// delete none, [x,x)
|
||||
{
|
||||
"delete none [x,x)",
|
||||
[]string{"foo"},
|
||||
"foo", "foo", false,
|
||||
[][]byte{[]byte("foo")}, 0,
|
||||
},
|
||||
// delete middle with preserveKVs set
|
||||
{
|
||||
"delete middle with preserveKVs set",
|
||||
[]string{"foo", "foo/abc", "fop"},
|
||||
"foo/", "fop", true,
|
||||
[][]byte{[]byte("foo"), []byte("fop")}, 1,
|
||||
@ -1065,55 +1067,56 @@ func TestV3DeleteRange(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ks := tt.keySet
|
||||
for j := range ks {
|
||||
reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
|
||||
_, err := kvc.Put(context.TODO(), reqput)
|
||||
ks := tt.keySet
|
||||
for j := range ks {
|
||||
reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
|
||||
_, err := kvc.Put(context.TODO(), reqput)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
dreq := &pb.DeleteRangeRequest{
|
||||
Key: []byte(tt.begin),
|
||||
RangeEnd: []byte(tt.end),
|
||||
PrevKv: tt.prevKV,
|
||||
}
|
||||
dresp, err := kvc.DeleteRange(context.TODO(), dreq)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
t.Fatalf("couldn't delete range on test %d (%v)", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
dreq := &pb.DeleteRangeRequest{
|
||||
Key: []byte(tt.begin),
|
||||
RangeEnd: []byte(tt.end),
|
||||
PrevKv: tt.prevKV,
|
||||
}
|
||||
dresp, err := kvc.DeleteRange(context.TODO(), dreq)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't delete range on test %d (%v)", i, err)
|
||||
}
|
||||
if tt.deleted != dresp.Deleted {
|
||||
t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
|
||||
}
|
||||
if tt.prevKV {
|
||||
if len(dresp.PrevKvs) != int(dresp.Deleted) {
|
||||
t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
|
||||
if tt.deleted != dresp.Deleted {
|
||||
t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
|
||||
}
|
||||
if tt.prevKV {
|
||||
if len(dresp.PrevKvs) != int(dresp.Deleted) {
|
||||
t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
|
||||
rresp, err := kvc.Range(context.TODO(), rreq)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't get range on test %v (%v)", i, err)
|
||||
}
|
||||
if dresp.Header.Revision != rresp.Header.Revision {
|
||||
t.Errorf("expected revision %v, got %v",
|
||||
dresp.Header.Revision, rresp.Header.Revision)
|
||||
}
|
||||
rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
|
||||
rresp, err := kvc.Range(context.TODO(), rreq)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't get range on test %v (%v)", i, err)
|
||||
}
|
||||
if dresp.Header.Revision != rresp.Header.Revision {
|
||||
t.Errorf("expected revision %v, got %v",
|
||||
dresp.Header.Revision, rresp.Header.Revision)
|
||||
}
|
||||
|
||||
keys := [][]byte{}
|
||||
for j := range rresp.Kvs {
|
||||
keys = append(keys, rresp.Kvs[j].Key)
|
||||
}
|
||||
if !reflect.DeepEqual(tt.wantSet, keys) {
|
||||
t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
|
||||
}
|
||||
// can't defer because tcp ports will be in use
|
||||
clus.Terminate(t)
|
||||
keys := [][]byte{}
|
||||
for j := range rresp.Kvs {
|
||||
keys = append(keys, rresp.Kvs[j].Key)
|
||||
}
|
||||
if !reflect.DeepEqual(tt.wantSet, keys) {
|
||||
t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1286,6 +1289,8 @@ func TestV3StorageQuotaAPI(t *testing.T) {
|
||||
func TestV3RangeRequest(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
putKeys []string
|
||||
reqs []pb.RangeRequest
|
||||
|
||||
@ -1293,8 +1298,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
wmores []bool
|
||||
wcounts []int64
|
||||
}{
|
||||
// single key
|
||||
{
|
||||
"single key",
|
||||
[]string{"foo", "bar"},
|
||||
[]pb.RangeRequest{
|
||||
// exists
|
||||
@ -1310,8 +1315,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{false, false},
|
||||
[]int64{1, 0},
|
||||
},
|
||||
// multi-key
|
||||
{
|
||||
"multi-key",
|
||||
[]string{"a", "b", "c", "d", "e"},
|
||||
[]pb.RangeRequest{
|
||||
// all in range
|
||||
@ -1339,8 +1344,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{false, false, false, false, false, false},
|
||||
[]int64{5, 2, 0, 0, 0, 5},
|
||||
},
|
||||
// revision
|
||||
{
|
||||
"revision",
|
||||
[]string{"a", "b", "c", "d", "e"},
|
||||
[]pb.RangeRequest{
|
||||
{Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
|
||||
@ -1358,8 +1363,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{false, false, false, false},
|
||||
[]int64{5, 0, 1, 2},
|
||||
},
|
||||
// limit
|
||||
{
|
||||
"limit",
|
||||
[]string{"a", "b", "c"},
|
||||
[]pb.RangeRequest{
|
||||
// more
|
||||
@ -1381,8 +1386,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{true, true, false, false},
|
||||
[]int64{3, 3, 3, 3},
|
||||
},
|
||||
// sort
|
||||
{
|
||||
"sort",
|
||||
[]string{"b", "a", "c", "d", "c"},
|
||||
[]pb.RangeRequest{
|
||||
{
|
||||
@ -1434,8 +1439,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{true, true, true, true, false, false},
|
||||
[]int64{4, 4, 4, 4, 0, 4},
|
||||
},
|
||||
// min/max mod rev
|
||||
{
|
||||
"min/max mod rev",
|
||||
[]string{"rev2", "rev3", "rev4", "rev5", "rev6"},
|
||||
[]pb.RangeRequest{
|
||||
{
|
||||
@ -1466,8 +1471,8 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
[]bool{false, false, false, false},
|
||||
[]int64{5, 5, 5, 5},
|
||||
},
|
||||
// min/max create rev
|
||||
{
|
||||
"min/max create rev",
|
||||
[]string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
|
||||
[]pb.RangeRequest{
|
||||
{
|
||||
@ -1501,44 +1506,46 @@ func TestV3RangeRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
for _, k := range tt.putKeys {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
for j, req := range tt.reqs {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
resp, err := kvc.Range(context.TODO(), &req)
|
||||
if err != nil {
|
||||
t.Errorf("#%d.%d: Range error: %v", i, j, err)
|
||||
continue
|
||||
}
|
||||
if len(resp.Kvs) != len(tt.wresps[j]) {
|
||||
t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
|
||||
continue
|
||||
}
|
||||
for k, wKey := range tt.wresps[j] {
|
||||
respKey := string(resp.Kvs[k].Key)
|
||||
if respKey != wKey {
|
||||
t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
for _, k := range tt.putKeys {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
if resp.More != tt.wmores[j] {
|
||||
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
|
||||
|
||||
for j, req := range tt.reqs {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
resp, err := kvc.Range(context.TODO(), &req)
|
||||
if err != nil {
|
||||
t.Errorf("#%d.%d: Range error: %v", i, j, err)
|
||||
continue
|
||||
}
|
||||
if len(resp.Kvs) != len(tt.wresps[j]) {
|
||||
t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
|
||||
continue
|
||||
}
|
||||
for k, wKey := range tt.wresps[j] {
|
||||
respKey := string(resp.Kvs[k].Key)
|
||||
if respKey != wKey {
|
||||
t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
|
||||
}
|
||||
}
|
||||
if resp.More != tt.wmores[j] {
|
||||
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
|
||||
}
|
||||
if resp.GetCount() != tt.wcounts[j] {
|
||||
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
|
||||
}
|
||||
wrev := int64(len(tt.putKeys) + 1)
|
||||
if resp.Header.Revision != wrev {
|
||||
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
|
||||
}
|
||||
}
|
||||
if resp.GetCount() != tt.wcounts[j] {
|
||||
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
|
||||
}
|
||||
wrev := int64(len(tt.putKeys) + 1)
|
||||
if resp.Header.Revision != wrev {
|
||||
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
|
||||
}
|
||||
}
|
||||
clus.Terminate(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1925,25 +1932,27 @@ func TestV3LargeRequests(t *testing.T) {
|
||||
{10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
|
||||
}
|
||||
for i, test := range tests {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
|
||||
kvcli := toGRPC(clus.Client(0)).KV
|
||||
reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
|
||||
_, err := kvcli.Put(context.TODO(), reqput)
|
||||
if !eqErrGRPC(err, test.expectError) {
|
||||
t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
|
||||
// request went through, expect large response back from server
|
||||
if test.expectError == nil {
|
||||
reqget := &pb.RangeRequest{Key: []byte("foo")}
|
||||
// limit receive call size with original value + gRPC overhead bytes
|
||||
_, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
|
||||
if err != nil {
|
||||
t.Errorf("#%d: range expected no error, got %v", i, err)
|
||||
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
|
||||
defer clus.Terminate(t)
|
||||
kvcli := toGRPC(clus.Client(0)).KV
|
||||
reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
|
||||
_, err := kvcli.Put(context.TODO(), reqput)
|
||||
if !eqErrGRPC(err, test.expectError) {
|
||||
t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
}
|
||||
|
||||
clus.Terminate(t)
|
||||
// request went through, expect large response back from server
|
||||
if test.expectError == nil {
|
||||
reqget := &pb.RangeRequest{Key: []byte("foo")}
|
||||
// limit receive call size with original value + gRPC overhead bytes
|
||||
_, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
|
||||
if err != nil {
|
||||
t.Errorf("#%d: range expected no error, got %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,13 +33,15 @@ import (
|
||||
func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
putKeys []string
|
||||
watchRequest *pb.WatchRequest
|
||||
|
||||
wresps []*pb.WatchResponse
|
||||
}{
|
||||
// watch the key, matching
|
||||
{
|
||||
"watch the key, matching",
|
||||
[]string{"foo"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -58,8 +60,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// watch the key, non-matching
|
||||
{
|
||||
"watch the key, non-matching",
|
||||
[]string{"foo"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -67,8 +69,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
|
||||
[]*pb.WatchResponse{},
|
||||
},
|
||||
// watch the prefix, matching
|
||||
{
|
||||
"watch the prefix, matching",
|
||||
[]string{"fooLong"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -88,8 +90,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// watch the prefix, non-matching
|
||||
{
|
||||
"watch the prefix, non-matching",
|
||||
[]string{"foo"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -98,8 +100,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
|
||||
[]*pb.WatchResponse{},
|
||||
},
|
||||
// watch full range, matching
|
||||
{
|
||||
"watch full range, matching",
|
||||
[]string{"fooLong"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -119,8 +121,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// multiple puts, one watcher with matching key
|
||||
{
|
||||
"multiple puts, one watcher with matching key",
|
||||
[]string{"foo", "foo", "foo"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -159,8 +161,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// multiple puts, one watcher with matching prefix
|
||||
{
|
||||
"multiple puts, one watcher with matching perfix",
|
||||
[]string{"foo", "foo", "foo"},
|
||||
&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{
|
||||
@ -203,95 +205,87 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
wAPI := toGRPC(clus.RandClient()).Watch
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, err := wAPI.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wAPI.Watch error: %v", i, err)
|
||||
}
|
||||
wAPI := toGRPC(clus.RandClient()).Watch
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, err := wAPI.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wAPI.Watch error: %v", i, err)
|
||||
}
|
||||
|
||||
err = wStream.Send(tt.watchRequest)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||
}
|
||||
err = wStream.Send(tt.watchRequest)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||
}
|
||||
|
||||
// ensure watcher request created a new watcher
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("#%d: wStream.Recv error: %v", i, err)
|
||||
clus.Terminate(t)
|
||||
continue
|
||||
}
|
||||
if !cresp.Created {
|
||||
t.Errorf("#%d: did not create watchid, got %+v", i, cresp)
|
||||
clus.Terminate(t)
|
||||
continue
|
||||
}
|
||||
if cresp.Canceled {
|
||||
t.Errorf("#%d: canceled watcher on create %+v", i, cresp)
|
||||
clus.Terminate(t)
|
||||
continue
|
||||
}
|
||||
// ensure watcher request created a new watcher
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wStream.Recv error: %v", i, err)
|
||||
}
|
||||
if !cresp.Created {
|
||||
t.Fatalf("#%d: did not create watchid, got %+v", i, cresp)
|
||||
}
|
||||
if cresp.Canceled {
|
||||
t.Fatalf("#%d: canceled watcher on create %+v", i, cresp)
|
||||
}
|
||||
|
||||
createdWatchId := cresp.WatchId
|
||||
if cresp.Header == nil || cresp.Header.Revision != 1 {
|
||||
t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp)
|
||||
clus.Terminate(t)
|
||||
continue
|
||||
}
|
||||
createdWatchId := cresp.WatchId
|
||||
if cresp.Header == nil || cresp.Header.Revision != 1 {
|
||||
t.Fatalf("#%d: header revision got +%v, wanted revison 1", i, cresp)
|
||||
}
|
||||
|
||||
// asynchronously create keys
|
||||
ch := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for _, k := range tt.putKeys {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||
t.Errorf("#%d: couldn't put key (%v)", i, err)
|
||||
// asynchronously create keys
|
||||
ch := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for _, k := range tt.putKeys {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||
t.Errorf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
// check stream results
|
||||
for j, wresp := range tt.wresps {
|
||||
resp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err)
|
||||
}
|
||||
|
||||
if resp.Header == nil {
|
||||
t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j)
|
||||
}
|
||||
if resp.Header.Revision != wresp.Header.Revision {
|
||||
t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision)
|
||||
}
|
||||
|
||||
if wresp.Created != resp.Created {
|
||||
t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created)
|
||||
}
|
||||
if resp.WatchId != createdWatchId {
|
||||
t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(resp.Events, wresp.Events) {
|
||||
t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events)
|
||||
}
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
// check stream results
|
||||
for j, wresp := range tt.wresps {
|
||||
resp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err)
|
||||
rok, nr := waitResponse(wStream, 1*time.Second)
|
||||
if !rok {
|
||||
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
|
||||
}
|
||||
|
||||
if resp.Header == nil {
|
||||
t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j)
|
||||
}
|
||||
if resp.Header.Revision != wresp.Header.Revision {
|
||||
t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision)
|
||||
}
|
||||
|
||||
if wresp.Created != resp.Created {
|
||||
t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created)
|
||||
}
|
||||
if resp.WatchId != createdWatchId {
|
||||
t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(resp.Events, wresp.Events) {
|
||||
t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events)
|
||||
}
|
||||
}
|
||||
|
||||
rok, nr := waitResponse(wStream, 1*time.Second)
|
||||
if !rok {
|
||||
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
|
||||
}
|
||||
|
||||
// wait for the client to finish sending the keys before terminating the cluster
|
||||
<-ch
|
||||
|
||||
// can't defer because tcp ports will be in use
|
||||
clus.Terminate(t)
|
||||
// wait for the client to finish sending the keys before terminating the cluster
|
||||
<-ch
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user