mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
integration: TestV3WatchMultipleEventsPutUnsynced
This commit is contained in:
parent
d26b1460c5
commit
39116e2e20
@ -666,6 +666,80 @@ func (evs eventsSortByKey) Len() int { return len(evs) }
|
||||
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
|
||||
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
|
||||
|
||||
func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
|
||||
clus := newClusterGRPC(t, &clusterConfig{size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
kvc := pb.NewKVClient(clus.RandConn())
|
||||
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
|
||||
wAPI := pb.NewWatchClient(clus.RandConn())
|
||||
wStream, wErr := wAPI.Watch(context.TODO())
|
||||
if wErr != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||
}
|
||||
|
||||
if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo"), StartRevision: 1}}); err != nil {
|
||||
t.Fatalf("wStream.Send error: %v", err)
|
||||
}
|
||||
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatalf("couldn't put key (%v)", err)
|
||||
}
|
||||
|
||||
allWevents := []*storagepb.Event{
|
||||
{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
|
||||
},
|
||||
{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
|
||||
},
|
||||
{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 2},
|
||||
},
|
||||
{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 5, Version: 2},
|
||||
},
|
||||
}
|
||||
|
||||
events := []*storagepb.Event{}
|
||||
for len(events) < 4 {
|
||||
resp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if resp.Created {
|
||||
continue
|
||||
}
|
||||
events = append(events, resp.Events...)
|
||||
// if PUT requests are committed by now, first receive would return
|
||||
// multiple events, but if not, it returns a single event. In SSD,
|
||||
// it should return 4 events at once.
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(events, allWevents) {
|
||||
t.Errorf("events got = %+v, want = %+v", events, allWevents)
|
||||
}
|
||||
|
||||
rok, nr := WaitResponse(wStream, 1*time.Second)
|
||||
if !rok {
|
||||
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3WatchMultipleStreamsSynced(t *testing.T) {
|
||||
testV3WatchMultipleStreams(t, 0)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user