mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
integration: add TestV3WatchMultiple
For https://github.com/coreos/etcd/issues/4216.
This commit is contained in:
parent
22dd738228
commit
0f3573a57e
@ -491,3 +491,116 @@ func TestV3WatchCancel(t *testing.T) {
|
||||
|
||||
clus.Terminate(t)
|
||||
}
|
||||
|
||||
// TestV3WatchMultiple tests multiple watchers on the same key
|
||||
// and one watcher with matching prefix. It first puts the key
|
||||
// that matches all watchers, and another key that matches only
|
||||
// one watcher to test if it receives expected events.
|
||||
func TestV3WatchMultiple(t *testing.T) {
|
||||
clus := newClusterGRPC(t, &clusterConfig{size: 3})
|
||||
wAPI := pb.NewWatchClient(clus.RandConn())
|
||||
kvc := pb.NewKVClient(clus.RandConn())
|
||||
|
||||
wStream, errW := wAPI.Watch(context.TODO())
|
||||
if errW != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", errW)
|
||||
}
|
||||
|
||||
watchKeyN := 4
|
||||
for i := 0; i < watchKeyN+1; i++ {
|
||||
var wreq *pb.WatchRequest
|
||||
if i < watchKeyN {
|
||||
wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}
|
||||
} else {
|
||||
wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("fo")}}
|
||||
}
|
||||
if err := wStream.Send(wreq); err != nil {
|
||||
t.Fatalf("wStream.Send error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
ids := make(map[int64]struct{})
|
||||
for i := 0; i < watchKeyN+1; i++ {
|
||||
wresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if !wresp.Created {
|
||||
t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
|
||||
}
|
||||
ids[wresp.WatchId] = struct{}{}
|
||||
}
|
||||
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
|
||||
for i := 0; i < watchKeyN+1; i++ {
|
||||
wresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if _, ok := ids[wresp.WatchId]; !ok {
|
||||
t.Errorf("watchId %d is not created!", wresp.WatchId)
|
||||
} else {
|
||||
delete(ids, wresp.WatchId)
|
||||
}
|
||||
if len(wresp.Events) == 0 {
|
||||
t.Errorf("#%d: no events received", i)
|
||||
}
|
||||
for _, ev := range wresp.Events {
|
||||
if string(ev.Kv.Key) != "foo" {
|
||||
t.Errorf("ev.Kv.Key got = %s, want = foo", ev.Kv.Key)
|
||||
}
|
||||
if string(ev.Kv.Value) != "bar" {
|
||||
t.Errorf("ev.Kv.Value got = %s, want = bar", ev.Kv.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now put one key that has only one matching watcher
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("fo"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
wresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if len(wresp.Events) != 1 {
|
||||
t.Fatalf("len(wresp.Events) got = %d, want = 1", len(wresp.Events))
|
||||
}
|
||||
if string(wresp.Events[0].Kv.Key) != "fo" {
|
||||
t.Errorf("wresp.Events[0].Kv.Key got = %s, want = fo", wresp.Events[0].Kv.Key)
|
||||
}
|
||||
|
||||
// now Recv should block because there is no more events coming
|
||||
rok, nr := WaitResponse(wStream, 1*time.Second)
|
||||
if !rok {
|
||||
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
|
||||
}
|
||||
|
||||
clus.Terminate(t)
|
||||
}
|
||||
|
||||
// WaitResponse waits on the given stream for given duration.
|
||||
// If there is no more events, true and a nil response will be
|
||||
// returned closing the WatchClient stream. Or the response will
|
||||
// be returned.
|
||||
func WaitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) {
|
||||
rCh := make(chan *pb.WatchResponse)
|
||||
go func() {
|
||||
resp, _ := wc.Recv()
|
||||
rCh <- resp
|
||||
}()
|
||||
select {
|
||||
case nr := <-rCh:
|
||||
return false, nr
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
wc.CloseSend()
|
||||
rv, ok := <-rCh
|
||||
if rv != nil || !ok {
|
||||
return false, rv
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user