diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 7c96595ff..238cf058e 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -491,3 +491,116 @@ func TestV3WatchCancel(t *testing.T) { clus.Terminate(t) } + +// TestV3WatchMultiple tests multiple watchers on the same key +// and one watcher with matching prefix. It first puts the key +// that matches all watchers, and another key that matches only +// one watcher to test if it receives expected events. +func TestV3WatchMultiple(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + wAPI := pb.NewWatchClient(clus.RandConn()) + kvc := pb.NewKVClient(clus.RandConn()) + + wStream, errW := wAPI.Watch(context.TODO()) + if errW != nil { + t.Fatalf("wAPI.Watch error: %v", errW) + } + + watchKeyN := 4 + for i := 0; i < watchKeyN+1; i++ { + var wreq *pb.WatchRequest + if i < watchKeyN { + wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}} + } else { + wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("fo")}} + } + if err := wStream.Send(wreq); err != nil { + t.Fatalf("wStream.Send error: %v", err) + } + } + + ids := make(map[int64]struct{}) + for i := 0; i < watchKeyN+1; i++ { + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + if !wresp.Created { + t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) + } + ids[wresp.WatchId] = struct{}{} + } + + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + + for i := 0; i < watchKeyN+1; i++ { + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + if _, ok := ids[wresp.WatchId]; !ok { + t.Errorf("watchId %d is not created!", wresp.WatchId) + } else { + delete(ids, wresp.WatchId) + } + if len(wresp.Events) == 0 { + t.Errorf("#%d: no events received", i) + } + for _, ev := range wresp.Events { + if string(ev.Kv.Key) != "foo" { + t.Errorf("ev.Kv.Key got = %s, want = foo", ev.Kv.Key) + } + if string(ev.Kv.Value) != "bar" { + t.Errorf("ev.Kv.Value got = %s, want = bar", ev.Kv.Value) + } + } + } + + // now put one key that has only one matching watcher + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("fo"), Value: []byte("bar")}); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + wresp, err := wStream.Recv() + if err != nil { + t.Errorf("wStream.Recv error: %v", err) + } + if len(wresp.Events) != 1 { + t.Fatalf("len(wresp.Events) got = %d, want = 1", len(wresp.Events)) + } + if string(wresp.Events[0].Kv.Key) != "fo" { + t.Errorf("wresp.Events[0].Kv.Key got = %s, want = fo", wresp.Events[0].Kv.Key) + } + + // now Recv should block because there is no more events coming + rok, nr := WaitResponse(wStream, 1*time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) + } + + clus.Terminate(t) +} + +// WaitResponse waits on the given stream for given duration. +// If there is no more events, true and a nil response will be +// returned closing the WatchClient stream. Or the response will +// be returned. +func WaitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) { + rCh := make(chan *pb.WatchResponse) + go func() { + resp, _ := wc.Recv() + rCh <- resp + }() + select { + case nr := <-rCh: + return false, nr + case <-time.After(timeout): + } + wc.CloseSend() + rv, ok := <-rCh + if rv != nil || !ok { + return false, rv + } + return true, nil +}