diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 8577dc17f..50c4dcc7a 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -17,10 +17,12 @@ import ( "math/rand" "reflect" "testing" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage/storagepb" ) type clusterV3 struct { @@ -193,3 +195,237 @@ func TestV3DeleteRange(t *testing.T) { clus.Terminate(t) } } + +// TestV3WatchFromCurrentRevision tests Watch APIs from current revision. +func TestV3WatchFromCurrentRevision(t *testing.T) { + tests := []struct { + putKeys []string + watchRequest *pb.WatchRequest + + wresps []*pb.WatchResponse + }{ + // watch the key, matching + { + []string{"foo"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + }, + }, + }, + }, + // watch the key, non-matching + { + []string{"foo"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("helloworld")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + }, + }, + // watch the prefix, matching + { + []string{"fooLong"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("fooLong"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + }, + }, + }, + }, + // watch the prefix, non-matching + { + []string{"foo"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("helloworld")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + }, + }, + // multiple puts, one watcher with matching key + { + []string{"foo", "foo", "foo"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + }, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 2, Version: 2}, + }, + }, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 3, Version: 3}, + }, + }, + }, + }, + }, + // multiple puts, one watcher with matching prefix + { + []string{"foo", "foo", "foo"}, + &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: true, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + }, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 2, Version: 2}, + }, + }, + }, + { + Header: &pb.ResponseHeader{Revision: 1}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 3, Version: 3}, + }, + }, + }, + }, + }, + + // TODO: watch and receive multiple-events from synced (need Txn) + } + + for i, tt := range tests { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + + wAPI := pb.NewWatchClient(clus.RandConn()) + wStream, err := wAPI.Watch(context.TODO()) + if err != nil { + t.Fatalf("#%d: wAPI.Watch error: %v", i, err) + } + + if err := wStream.Send(tt.watchRequest); err != nil { + t.Fatalf("#%d: wStream.Send error: %v", i, err) + } + + kvc := pb.NewKVClient(clus.RandConn()) + for _, k := range tt.putKeys { + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}); err != nil { + t.Fatalf("#%d: couldn't put key (%v)", i, err) + } + } + + var createdWatchId int64 + for j, wresp := range tt.wresps { + resp, err := wStream.Recv() + if err != nil { + t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err) + } + + if resp.Header == nil { + t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j) + } + if resp.Header.Revision != wresp.Header.Revision { + t.Logf("[TODO - skip for now] #%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision) + } + + if wresp.Created != resp.Created { + t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created) + } + if resp.Created { + createdWatchId = resp.WatchId + } + if resp.WatchId != createdWatchId { + t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId) + } + + if !reflect.DeepEqual(resp.Events, wresp.Events) { + t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events) + } + } + + rCh := make(chan *pb.WatchResponse) + go func() { + resp, _ := wStream.Recv() + rCh <- resp + }() + select { + case nr := <-rCh: + t.Errorf("#%d: unexpected response is received %+v", i, nr) + case <-time.After(2 * time.Second): + } + wStream.CloseSend() + rv, ok := <-rCh + if rv != nil || !ok { + t.Errorf("#%d: rv, ok got = %v %v, want = nil true", i, rv, ok) + } + + // can't defer because tcp ports will be in use + clus.Terminate(t) + } +}