From 166055b443b14953502d36af19af277d6d8bbeb6 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 19 Jan 2016 12:52:25 -0800 Subject: [PATCH] integration: watch test for multi-events with txn Related to https://github.com/coreos/etcd/issues/4216. --- integration/v3_grpc_test.go | 81 +++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index ebd66d8b7..2cc899b6d 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -14,8 +14,11 @@ package integration import ( + "bytes" + "fmt" "math/rand" "reflect" + "sort" "testing" "time" @@ -560,6 +563,84 @@ func TestV3WatchMultiple(t *testing.T) { clus.Terminate(t) } +// TestV3WatchMultipleEventsFromCurrentRevision tests Watch APIs from current revision +// in cases it receives multiple events. +func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + + wAPI := pb.NewWatchClient(clus.RandConn()) + wStream, err := wAPI.Watch(context.TODO()) + if err != nil { + t.Fatalf("wAPI.Watch error: %v", err) + } + + if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil { + t.Fatalf("wStream.Send error: %v", err) + } + + kvc := pb.NewKVClient(clus.RandConn()) + txn := pb.TxnRequest{} + for i := 0; i < 3; i++ { + ru := &pb.RequestUnion{} + ru.RequestPut = &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")} + txn.Success = append(txn.Success, ru) + } + + tresp, err := kvc.Txn(context.Background(), &txn) + if err != nil { + t.Fatalf("kvc.Txn error: %v", err) + } + if !tresp.Succeeded { + t.Fatalf("kvc.Txn failed: %+v", tresp) + } + + events := []*storagepb.Event{} + for len(events) < 3 { + resp, err := wStream.Recv() + if err != nil { + t.Errorf("wStream.Recv error: %v", err) + } + if resp.Created { + continue + } + events = append(events, resp.Events...) + } + sort.Sort(eventsSortByKey(events)) + + wevents := []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + } + + if !reflect.DeepEqual(events, wevents) { + t.Errorf("events got = %+v, want = %+v", events, wevents) + } + + rok, nr := WaitResponse(wStream, 1*time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) + } + + // can't defer because tcp ports will be in use + clus.Terminate(t) +} + +type eventsSortByKey []*storagepb.Event + +func (evs eventsSortByKey) Len() int { return len(evs) } +func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] } +func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 } + // WaitResponse waits on the given stream for given duration. // If there is no more events, true and a nil response will be // returned closing the WatchClient stream. Or the response will