etcd/clientv3/integration/watch_test.go
Ajit Yagaty da5bd04a1a clientv3: Remove superflous LeaseID casts in integration tests.
The integration tests under clientv3 have superflous LeaseID casts
that are not needed as the ID field of the lease responses are of
type LeaseID now.
2016-04-15 17:48:20 -07:00

505 lines
13 KiB
Go

// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
storagepb "github.com/coreos/etcd/storage/storagepb"
"golang.org/x/net/context"
)
type watcherTest func(*testing.T, *watchctx)
type watchctx struct {
clus *integration.ClusterV3
w clientv3.Watcher
wclient *clientv3.Client
kv clientv3.KV
ch clientv3.WatchChan
}
func runWatchTest(t *testing.T, f watcherTest) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
wclient := clus.RandClient()
w := clientv3.NewWatcher(wclient)
defer w.Close()
// select a different client from wclient so puts succeed if
// a test knocks out the watcher client
kvclient := clus.RandClient()
for kvclient == wclient {
kvclient = clus.RandClient()
}
kv := clientv3.NewKV(kvclient)
wctx := &watchctx{clus, w, wclient, kv, nil}
f(t, wctx)
}
// TestWatchMultiWatcher modifies multiple keys and observes the changes.
func TestWatchMultiWatcher(t *testing.T) {
runWatchTest(t, testWatchMultiWatcher)
}
func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
numKeyUpdates := 4
keys := []string{"foo", "bar", "baz"}
donec := make(chan struct{})
readyc := make(chan struct{})
for _, k := range keys {
// key watcher
go func(key string) {
ch := wctx.w.Watch(context.TODO(), key)
if ch == nil {
t.Fatalf("expected watcher channel, got nil")
}
readyc <- struct{}{}
for i := 0; i < numKeyUpdates; i++ {
resp, ok := <-ch
if !ok {
t.Fatalf("watcher unexpectedly closed")
}
v := fmt.Sprintf("%s-%d", key, i)
gotv := string(resp.Events[0].Kv.Value)
if gotv != v {
t.Errorf("#%d: got %s, wanted %s", i, gotv, v)
}
}
donec <- struct{}{}
}(k)
}
// prefix watcher on "b" (bar and baz)
go func() {
prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
if prefixc == nil {
t.Fatalf("expected watcher channel, got nil")
}
readyc <- struct{}{}
evs := []*clientv3.Event{}
for i := 0; i < numKeyUpdates*2; i++ {
resp, ok := <-prefixc
if !ok {
t.Fatalf("watcher unexpectedly closed")
}
evs = append(evs, resp.Events...)
}
// check response
expected := []string{}
bkeys := []string{"bar", "baz"}
for _, k := range bkeys {
for i := 0; i < numKeyUpdates; i++ {
expected = append(expected, fmt.Sprintf("%s-%d", k, i))
}
}
got := []string{}
for _, ev := range evs {
got = append(got, string(ev.Kv.Value))
}
sort.Strings(got)
if !reflect.DeepEqual(expected, got) {
t.Errorf("got %v, expected %v", got, expected)
}
// ensure no extra data
select {
case resp, ok := <-prefixc:
if !ok {
t.Fatalf("watcher unexpectedly closed")
}
t.Fatalf("unexpected event %+v", resp)
case <-time.After(time.Second):
}
donec <- struct{}{}
}()
// wait for watcher bring up
for i := 0; i < len(keys)+1; i++ {
<-readyc
}
// generate events
ctx := context.TODO()
for i := 0; i < numKeyUpdates; i++ {
for _, k := range keys {
v := fmt.Sprintf("%s-%d", k, i)
if _, err := wctx.kv.Put(ctx, k, v); err != nil {
t.Fatal(err)
}
}
}
// wait for watcher shutdown
for i := 0; i < len(keys)+1; i++ {
<-donec
}
}
// TestWatchRange tests watcher creates ranges
func TestWatchRange(t *testing.T) {
runWatchTest(t, testWatchRange)
}
func testWatchRange(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", clientv3.WithRange("c")); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
putAndWatch(t, wctx, "a", "a")
putAndWatch(t, wctx, "b", "b")
putAndWatch(t, wctx, "bar", "bar")
}
// TestWatchReconnRequest tests the send failure path when requesting a watcher.
func TestWatchReconnRequest(t *testing.T) {
runWatchTest(t, testWatchReconnRequest)
}
func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
donec, stopc := make(chan struct{}), make(chan struct{}, 1)
go func() {
timer := time.After(2 * time.Second)
defer close(donec)
// take down watcher connection
for {
wctx.wclient.ActiveConnection().Close()
select {
case <-timer:
// spinning on close may live lock reconnection
return
case <-stopc:
return
default:
}
}
}()
// should reconnect when requesting watch
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
// wait for disconnections to stop
stopc <- struct{}{}
<-donec
// ensure watcher works
putAndWatch(t, wctx, "a", "a")
}
// TestWatchReconnInit tests watcher resumes correctly if connection lost
// before any data was sent.
func TestWatchReconnInit(t *testing.T) {
runWatchTest(t, testWatchReconnInit)
}
func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
// take down watcher connection
wctx.wclient.ActiveConnection().Close()
// watcher should recover
putAndWatch(t, wctx, "a", "a")
}
// TestWatchReconnRunning tests watcher resumes correctly if connection lost
// after data was sent.
func TestWatchReconnRunning(t *testing.T) {
runWatchTest(t, testWatchReconnRunning)
}
func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
putAndWatch(t, wctx, "a", "a")
// take down watcher connection
wctx.wclient.ActiveConnection().Close()
// watcher should recover
putAndWatch(t, wctx, "a", "b")
}
// TestWatchCancelImmediate ensures a closed channel is returned
// if the context is cancelled.
func TestWatchCancelImmediate(t *testing.T) {
runWatchTest(t, testWatchCancelImmediate)
}
func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
wch := wctx.w.Watch(ctx, "a")
select {
case wresp, ok := <-wch:
if ok {
t.Fatalf("read wch got %v; expected closed channel", wresp)
}
default:
t.Fatalf("closed watcher channel should not block")
}
}
// TestWatchCancelInit tests watcher closes correctly after no events.
func TestWatchCancelInit(t *testing.T) {
runWatchTest(t, testWatchCancelInit)
}
func testWatchCancelInit(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
cancel()
select {
case <-time.After(time.Second):
t.Fatalf("took too long to cancel")
case _, ok := <-wctx.ch:
if ok {
t.Fatalf("expected watcher channel to close")
}
}
}
// TestWatchCancelRunning tests watcher closes correctly after events.
func TestWatchCancelRunning(t *testing.T) {
runWatchTest(t, testWatchCancelRunning)
}
func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {
t.Fatal(err)
}
cancel()
select {
case <-time.After(time.Second):
t.Fatalf("took too long to cancel")
case v, ok := <-wctx.ch:
if !ok {
// closed before getting put; OK
break
}
// got the PUT; should close next
select {
case <-time.After(time.Second):
t.Fatalf("took too long to close")
case v, ok = <-wctx.ch:
if ok {
t.Fatalf("expected watcher channel to close, got %v", v)
}
}
}
}
func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
if _, err := wctx.kv.Put(context.TODO(), key, val); err != nil {
t.Fatal(err)
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("watch timed out")
case v, ok := <-wctx.ch:
if !ok {
t.Fatalf("unexpected watch close")
}
if string(v.Events[0].Kv.Value) != val {
t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
}
}
}
// TestWatchCompactRevision ensures the CompactRevision error is given on a
// compaction event ahead of a watcher.
func TestWatchCompactRevision(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// set some keys
kv := clientv3.NewKV(clus.RandClient())
for i := 0; i < 5; i++ {
if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
}
w := clientv3.NewWatcher(clus.RandClient())
defer w.Close()
if err := kv.Compact(context.TODO(), 4); err != nil {
t.Fatal(err)
}
wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
// get compacted error message
wresp, ok := <-wch
if !ok {
t.Fatalf("expected wresp, but got closed channel")
}
if wresp.Err() != rpctypes.ErrCompacted {
t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err())
}
// ensure the channel is closed
if wresp, ok = <-wch; ok {
t.Fatalf("expected closed channel, but got %v", wresp)
}
}
func TestWatchWithProgressNotify(t *testing.T) { testWatchWithProgressNotify(t, true) }
func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
defer testutil.AfterTest(t)
// accelerate report interval so test terminates quickly
oldpi := v3rpc.GetProgressReportInterval()
// using atomics to avoid race warnings
v3rpc.SetProgressReportInterval(3 * time.Second)
pi := 3 * time.Second
defer func() { v3rpc.SetProgressReportInterval(oldpi) }()
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()
opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
if watchOnPut {
opts = append(opts, clientv3.WithPrefix())
}
rch := wc.Watch(context.Background(), "foo", opts...)
select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 0 {
t.Fatalf("resp.Events expected none, got %+v", resp.Events)
}
case <-time.After(2 * pi):
t.Fatalf("watch response expected in %v, but timed out", pi)
}
kvc := clientv3.NewKV(clus.RandClient())
if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
t.Fatal(err)
}
select {
case resp := <-rch:
if resp.Header.Revision != 2 {
t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
}
if watchOnPut { // wait for put if watch on the put key
ev := []*clientv3.Event{{Type: clientv3.EventTypePut,
Kv: &storagepb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
if !reflect.DeepEqual(ev, resp.Events) {
t.Fatalf("expected %+v, got %+v", ev, resp.Events)
}
} else if len(resp.Events) != 0 { // wait for notification otherwise
t.Fatalf("expected no events, but got %+v", resp.Events)
}
case <-time.After(2 * pi):
t.Fatalf("watch response expected in %v, but timed out", pi)
}
}
func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
ctx := context.Background()
watchChan := client.Watch(ctx, "/", clientv3.WithPrefix())
if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil {
t.Fatalf("Put failed: %v", err)
}
if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil {
t.Fatalf("Put failed: %v", err)
}
if _, err := client.Delete(ctx, "/toDelete"); err != nil {
t.Fatalf("Delete failed: %v", err)
}
lcr, err := client.Lease.Grant(ctx, 1)
if err != nil {
t.Fatalf("lease create failed: %v", err)
}
if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(lcr.ID)); err != nil {
t.Fatalf("Put failed: %v", err)
}
tests := []struct {
et storagepb.Event_EventType
isCreate bool
isModify bool
}{{
et: clientv3.EventTypePut,
isCreate: true,
}, {
et: clientv3.EventTypePut,
isModify: true,
}, {
et: clientv3.EventTypeDelete,
}, {
et: clientv3.EventTypePut,
isCreate: true,
}, {
et: clientv3.EventTypeDelete,
}}
var res []*clientv3.Event
for {
select {
case wres := <-watchChan:
res = append(res, wres.Events...)
case <-time.After(10 * time.Second):
t.Fatalf("Should receive %d events and then break out loop", len(tests))
}
if len(res) == len(tests) {
break
}
}
for i, tt := range tests {
ev := res[i]
if tt.et != ev.Type {
t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type)
}
if tt.isCreate && !ev.IsCreate() {
t.Errorf("#%d: event should be CreateEvent", i)
}
if tt.isModify && !ev.IsModify() {
t.Errorf("#%d: event should be ModifyEvent", i)
}
}
}