mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	 5f9a1394db
			
		
	
	
		5f9a1394db
		
	
	
	
	
		
			
			Fixes: go test -tags cluster_proxy ./clientv3/integration -v -run TestWatchRequestProgress Does not fail the grpc-server (completely) by a not implemented RPC. Failing whole server by remote request is anti-pattern and security risk. Prior to the fix, the command line above was failing with: ``` === RUN TestWatchRequestProgress/0-watcher panic: not implemented goroutine 602 [running]: go.etcd.io/etcd/v3/proxy/grpcproxy.(*watchProxyStream).recvLoop(0xc0004779d0, 0x0, 0x0) /home/ptab/corp/etcd/proxy/grpcproxy/watch.go:275 +0xac5 go.etcd.io/etcd/v3/proxy/grpcproxy.(*watchProxy).Watch.func1(0xc0034f94a0, 0xc0004779d0) /home/ptab/corp/etcd/proxy/grpcproxy/watch.go:129 +0x53 created by go.etcd.io/etcd/v3/proxy/grpcproxy.(*watchProxy).Watch /home/ptab/corp/etcd/proxy/grpcproxy/watch.go:127 +0x3c8 FAIL go.etcd.io/etcd/v3/clientv3/integration 0.215s FAIL ```
		
			
				
	
	
		
			1202 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1202 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package integration
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"reflect"
 | |
| 	"sort"
 | |
| 	"strconv"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.etcd.io/etcd/v3/clientv3"
 | |
| 	"go.etcd.io/etcd/v3/etcdserver/api/v3rpc"
 | |
| 	"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
 | |
| 	"go.etcd.io/etcd/v3/integration"
 | |
| 	mvccpb "go.etcd.io/etcd/v3/mvcc/mvccpb"
 | |
| 	"go.etcd.io/etcd/v3/pkg/testutil"
 | |
| 	"go.etcd.io/etcd/v3/version"
 | |
| 
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| )
 | |
| 
 | |
| type watcherTest func(*testing.T, *watchctx)
 | |
| 
 | |
| type watchctx struct {
 | |
| 	clus          *integration.ClusterV3
 | |
| 	w             clientv3.Watcher
 | |
| 	kv            clientv3.KV
 | |
| 	wclientMember int
 | |
| 	kvMember      int
 | |
| 	ch            clientv3.WatchChan
 | |
| }
 | |
| 
 | |
| func runWatchTest(t *testing.T, f watcherTest) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	wclientMember := rand.Intn(3)
 | |
| 	w := clus.Client(wclientMember).Watcher
 | |
| 	// select a different client for KV operations so puts succeed if
 | |
| 	// a test knocks out the watcher client.
 | |
| 	kvMember := rand.Intn(3)
 | |
| 	for kvMember == wclientMember {
 | |
| 		kvMember = rand.Intn(3)
 | |
| 	}
 | |
| 	kv := clus.Client(kvMember).KV
 | |
| 
 | |
| 	wctx := &watchctx{clus, w, kv, wclientMember, kvMember, nil}
 | |
| 	f(t, wctx)
 | |
| }
 | |
| 
 | |
| // TestWatchMultiWatcher modifies multiple keys and observes the changes.
 | |
| func TestWatchMultiWatcher(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchMultiWatcher)
 | |
| }
 | |
| 
 | |
| func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
 | |
| 	numKeyUpdates := 4
 | |
| 	keys := []string{"foo", "bar", "baz"}
 | |
| 
 | |
| 	donec := make(chan struct{})
 | |
| 	readyc := make(chan struct{})
 | |
| 	for _, k := range keys {
 | |
| 		// key watcher
 | |
| 		go func(key string) {
 | |
| 			ch := wctx.w.Watch(context.TODO(), key)
 | |
| 			if ch == nil {
 | |
| 				t.Errorf("expected watcher channel, got nil")
 | |
| 			}
 | |
| 			readyc <- struct{}{}
 | |
| 			for i := 0; i < numKeyUpdates; i++ {
 | |
| 				resp, ok := <-ch
 | |
| 				if !ok {
 | |
| 					t.Errorf("watcher unexpectedly closed")
 | |
| 				}
 | |
| 				v := fmt.Sprintf("%s-%d", key, i)
 | |
| 				gotv := string(resp.Events[0].Kv.Value)
 | |
| 				if gotv != v {
 | |
| 					t.Errorf("#%d: got %s, wanted %s", i, gotv, v)
 | |
| 				}
 | |
| 			}
 | |
| 			donec <- struct{}{}
 | |
| 		}(k)
 | |
| 	}
 | |
| 	// prefix watcher on "b" (bar and baz)
 | |
| 	go func() {
 | |
| 		prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
 | |
| 		if prefixc == nil {
 | |
| 			t.Errorf("expected watcher channel, got nil")
 | |
| 		}
 | |
| 		readyc <- struct{}{}
 | |
| 		evs := []*clientv3.Event{}
 | |
| 		for i := 0; i < numKeyUpdates*2; i++ {
 | |
| 			resp, ok := <-prefixc
 | |
| 			if !ok {
 | |
| 				t.Errorf("watcher unexpectedly closed")
 | |
| 			}
 | |
| 			evs = append(evs, resp.Events...)
 | |
| 		}
 | |
| 
 | |
| 		// check response
 | |
| 		expected := []string{}
 | |
| 		bkeys := []string{"bar", "baz"}
 | |
| 		for _, k := range bkeys {
 | |
| 			for i := 0; i < numKeyUpdates; i++ {
 | |
| 				expected = append(expected, fmt.Sprintf("%s-%d", k, i))
 | |
| 			}
 | |
| 		}
 | |
| 		got := []string{}
 | |
| 		for _, ev := range evs {
 | |
| 			got = append(got, string(ev.Kv.Value))
 | |
| 		}
 | |
| 		sort.Strings(got)
 | |
| 		if !reflect.DeepEqual(expected, got) {
 | |
| 			t.Errorf("got %v, expected %v", got, expected)
 | |
| 		}
 | |
| 
 | |
| 		// ensure no extra data
 | |
| 		select {
 | |
| 		case resp, ok := <-prefixc:
 | |
| 			if !ok {
 | |
| 				t.Errorf("watcher unexpectedly closed")
 | |
| 			}
 | |
| 			t.Errorf("unexpected event %+v", resp)
 | |
| 		case <-time.After(time.Second):
 | |
| 		}
 | |
| 		donec <- struct{}{}
 | |
| 	}()
 | |
| 
 | |
| 	// wait for watcher bring up
 | |
| 	for i := 0; i < len(keys)+1; i++ {
 | |
| 		<-readyc
 | |
| 	}
 | |
| 	// generate events
 | |
| 	ctx := context.TODO()
 | |
| 	for i := 0; i < numKeyUpdates; i++ {
 | |
| 		for _, k := range keys {
 | |
| 			v := fmt.Sprintf("%s-%d", k, i)
 | |
| 			if _, err := wctx.kv.Put(ctx, k, v); err != nil {
 | |
| 				t.Fatal(err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	// wait for watcher shutdown
 | |
| 	for i := 0; i < len(keys)+1; i++ {
 | |
| 		<-donec
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchRange tests watcher creates ranges
 | |
| func TestWatchRange(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchRange)
 | |
| }
 | |
| 
 | |
| func testWatchRange(t *testing.T, wctx *watchctx) {
 | |
| 	if wctx.ch = wctx.w.Watch(context.TODO(), "a", clientv3.WithRange("c")); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil channel")
 | |
| 	}
 | |
| 	putAndWatch(t, wctx, "a", "a")
 | |
| 	putAndWatch(t, wctx, "b", "b")
 | |
| 	putAndWatch(t, wctx, "bar", "bar")
 | |
| }
 | |
| 
 | |
| // TestWatchReconnRequest tests the send failure path when requesting a watcher.
 | |
| func TestWatchReconnRequest(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchReconnRequest)
 | |
| }
 | |
| 
 | |
| func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
 | |
| 	donec, stopc := make(chan struct{}), make(chan struct{}, 1)
 | |
| 	go func() {
 | |
| 		timer := time.After(2 * time.Second)
 | |
| 		defer close(donec)
 | |
| 		// take down watcher connection
 | |
| 		for {
 | |
| 			wctx.clus.Members[wctx.wclientMember].DropConnections()
 | |
| 			select {
 | |
| 			case <-timer:
 | |
| 				// spinning on close may live lock reconnection
 | |
| 				return
 | |
| 			case <-stopc:
 | |
| 				return
 | |
| 			default:
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	// should reconnect when requesting watch
 | |
| 	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil channel")
 | |
| 	}
 | |
| 
 | |
| 	// wait for disconnections to stop
 | |
| 	stopc <- struct{}{}
 | |
| 	<-donec
 | |
| 
 | |
| 	// spinning on dropping connections may trigger a leader election
 | |
| 	// due to resource starvation; l-read to ensure the cluster is stable
 | |
| 	ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
 | |
| 	if _, err := wctx.kv.Get(ctx, "_"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	cancel()
 | |
| 
 | |
| 	// ensure watcher works
 | |
| 	putAndWatch(t, wctx, "a", "a")
 | |
| }
 | |
| 
 | |
| // TestWatchReconnInit tests watcher resumes correctly if connection lost
 | |
| // before any data was sent.
 | |
| func TestWatchReconnInit(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchReconnInit)
 | |
| }
 | |
| 
 | |
| func testWatchReconnInit(t *testing.T, wctx *watchctx) {
 | |
| 	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil channel")
 | |
| 	}
 | |
| 	wctx.clus.Members[wctx.wclientMember].DropConnections()
 | |
| 	// watcher should recover
 | |
| 	putAndWatch(t, wctx, "a", "a")
 | |
| }
 | |
| 
 | |
| // TestWatchReconnRunning tests watcher resumes correctly if connection lost
 | |
| // after data was sent.
 | |
| func TestWatchReconnRunning(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchReconnRunning)
 | |
| }
 | |
| 
 | |
| func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
 | |
| 	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil channel")
 | |
| 	}
 | |
| 	putAndWatch(t, wctx, "a", "a")
 | |
| 	// take down watcher connection
 | |
| 	wctx.clus.Members[wctx.wclientMember].DropConnections()
 | |
| 	// watcher should recover
 | |
| 	putAndWatch(t, wctx, "a", "b")
 | |
| }
 | |
| 
 | |
| // TestWatchCancelImmediate ensures a closed channel is returned
 | |
| // if the context is cancelled.
 | |
| func TestWatchCancelImmediate(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchCancelImmediate)
 | |
| }
 | |
| 
 | |
| func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	cancel()
 | |
| 	wch := wctx.w.Watch(ctx, "a")
 | |
| 	select {
 | |
| 	case wresp, ok := <-wch:
 | |
| 		if ok {
 | |
| 			t.Fatalf("read wch got %v; expected closed channel", wresp)
 | |
| 		}
 | |
| 	default:
 | |
| 		t.Fatalf("closed watcher channel should not block")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchCancelInit tests watcher closes correctly after no events.
 | |
| func TestWatchCancelInit(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchCancelInit)
 | |
| }
 | |
| 
 | |
| func testWatchCancelInit(t *testing.T, wctx *watchctx) {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil watcher channel")
 | |
| 	}
 | |
| 	cancel()
 | |
| 	select {
 | |
| 	case <-time.After(time.Second):
 | |
| 		t.Fatalf("took too long to cancel")
 | |
| 	case _, ok := <-wctx.ch:
 | |
| 		if ok {
 | |
| 			t.Fatalf("expected watcher channel to close")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchCancelRunning tests watcher closes correctly after events.
 | |
| func TestWatchCancelRunning(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchCancelRunning)
 | |
| }
 | |
| 
 | |
| func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
 | |
| 		t.Fatalf("expected non-nil watcher channel")
 | |
| 	}
 | |
| 	if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	cancel()
 | |
| 	select {
 | |
| 	case <-time.After(time.Second):
 | |
| 		t.Fatalf("took too long to cancel")
 | |
| 	case _, ok := <-wctx.ch:
 | |
| 		if !ok {
 | |
| 			// closed before getting put; OK
 | |
| 			break
 | |
| 		}
 | |
| 		// got the PUT; should close next
 | |
| 		select {
 | |
| 		case <-time.After(time.Second):
 | |
| 			t.Fatalf("took too long to close")
 | |
| 		case v, ok2 := <-wctx.ch:
 | |
| 			if ok2 {
 | |
| 				t.Fatalf("expected watcher channel to close, got %v", v)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 | |
| 	if _, err := wctx.kv.Put(context.TODO(), key, val); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	select {
 | |
| 	case <-time.After(5 * time.Second):
 | |
| 		t.Fatalf("watch timed out")
 | |
| 	case v, ok := <-wctx.ch:
 | |
| 		if !ok {
 | |
| 			t.Fatalf("unexpected watch close")
 | |
| 		}
 | |
| 		if string(v.Events[0].Kv.Value) != val {
 | |
| 			t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchResumeInitRev(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	cli := clus.Client(0)
 | |
| 	if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	// if resume is broken, it'll pick up this key first instead of a=3
 | |
| 	if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
 | |
| 	if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
 | |
| 		t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
 | |
| 	}
 | |
| 	// pause wch
 | |
| 	clus.Members[0].DropConnections()
 | |
| 	clus.Members[0].PauseConnections()
 | |
| 
 | |
| 	select {
 | |
| 	case resp, ok := <-wch:
 | |
| 		t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 	}
 | |
| 
 | |
| 	// resume wch
 | |
| 	clus.Members[0].UnpauseConnections()
 | |
| 
 | |
| 	select {
 | |
| 	case resp, ok := <-wch:
 | |
| 		if !ok {
 | |
| 			t.Fatal("unexpected watch close")
 | |
| 		}
 | |
| 		if len(resp.Events) == 0 {
 | |
| 			t.Fatal("expected event on watch")
 | |
| 		}
 | |
| 		if string(resp.Events[0].Kv.Value) != "3" {
 | |
| 			t.Fatalf("expected value=3, got event %+v", resp.Events[0])
 | |
| 		}
 | |
| 	case <-time.After(5 * time.Second):
 | |
| 		t.Fatal("watch timed out")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchResumeCompacted checks that the watcher gracefully closes in case
 | |
| // that it tries to resume to a revision that's been compacted out of the store.
 | |
| // Since the watcher's server restarts with stale data, the watcher will receive
 | |
| // either a compaction error or all keys by staying in sync before the compaction
 | |
| // is finally applied.
 | |
| func TestWatchResumeCompacted(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	// create a waiting watcher at rev 1
 | |
| 	w := clus.Client(0)
 | |
| 	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1))
 | |
| 	select {
 | |
| 	case w := <-wch:
 | |
| 		t.Errorf("unexpected message from wch %v", w)
 | |
| 	default:
 | |
| 	}
 | |
| 	clus.Members[0].Stop(t)
 | |
| 
 | |
| 	ticker := time.After(time.Second * 10)
 | |
| 	for clus.WaitLeader(t) <= 0 {
 | |
| 		select {
 | |
| 		case <-ticker:
 | |
| 			t.Fatalf("failed to wait for new leader")
 | |
| 		default:
 | |
| 			time.Sleep(10 * time.Millisecond)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// put some data and compact away
 | |
| 	numPuts := 5
 | |
| 	kv := clus.Client(1)
 | |
| 	for i := 0; i < numPuts; i++ {
 | |
| 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 | |
| 			t.Fatal(err)
 | |
| 		}
 | |
| 	}
 | |
| 	if _, err := kv.Compact(context.TODO(), 3); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	clus.Members[0].Restart(t)
 | |
| 
 | |
| 	// since watch's server isn't guaranteed to be synced with the cluster when
 | |
| 	// the watch resumes, there is a window where the watch can stay synced and
 | |
| 	// read off all events; if the watcher misses the window, it will go out of
 | |
| 	// sync and get a compaction error.
 | |
| 	wRev := int64(2)
 | |
| 	for int(wRev) <= numPuts+1 {
 | |
| 		var wresp clientv3.WatchResponse
 | |
| 		var ok bool
 | |
| 		select {
 | |
| 		case wresp, ok = <-wch:
 | |
| 			if !ok {
 | |
| 				t.Fatalf("expected wresp, but got closed channel")
 | |
| 			}
 | |
| 		case <-time.After(5 * time.Second):
 | |
| 			t.Fatalf("compacted watch timed out")
 | |
| 		}
 | |
| 		for _, ev := range wresp.Events {
 | |
| 			if ev.Kv.ModRevision != wRev {
 | |
| 				t.Fatalf("expected modRev %v, got %+v", wRev, ev)
 | |
| 			}
 | |
| 			wRev++
 | |
| 		}
 | |
| 		if wresp.Err() == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if wresp.Err() != rpctypes.ErrCompacted {
 | |
| 			t.Fatalf("wresp.Err() expected %v, got %+v", rpctypes.ErrCompacted, wresp.Err())
 | |
| 		}
 | |
| 		break
 | |
| 	}
 | |
| 	if int(wRev) > numPuts+1 {
 | |
| 		// got data faster than the compaction
 | |
| 		return
 | |
| 	}
 | |
| 	// received compaction error; ensure the channel closes
 | |
| 	select {
 | |
| 	case wresp, ok := <-wch:
 | |
| 		if ok {
 | |
| 			t.Fatalf("expected closed channel, but got %v", wresp)
 | |
| 		}
 | |
| 	case <-time.After(5 * time.Second):
 | |
| 		t.Fatalf("timed out waiting for channel close")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchCompactRevision ensures the CompactRevision error is given on a
 | |
| // compaction event ahead of a watcher.
 | |
| func TestWatchCompactRevision(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	// set some keys
 | |
| 	kv := clus.RandClient()
 | |
| 	for i := 0; i < 5; i++ {
 | |
| 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 | |
| 			t.Fatal(err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	w := clus.RandClient()
 | |
| 
 | |
| 	if _, err := kv.Compact(context.TODO(), 4); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
 | |
| 
 | |
| 	// get compacted error message
 | |
| 	wresp, ok := <-wch
 | |
| 	if !ok {
 | |
| 		t.Fatalf("expected wresp, but got closed channel")
 | |
| 	}
 | |
| 	if wresp.Err() != rpctypes.ErrCompacted {
 | |
| 		t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
 | |
| 	}
 | |
| 	if !wresp.Canceled {
 | |
| 		t.Fatalf("wresp.Canceled expected true, got %+v", wresp)
 | |
| 	}
 | |
| 
 | |
| 	// ensure the channel is closed
 | |
| 	if wresp, ok = <-wch; ok {
 | |
| 		t.Fatalf("expected closed channel, but got %v", wresp)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchWithProgressNotify(t *testing.T)        { testWatchWithProgressNotify(t, true) }
 | |
| func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
 | |
| 
 | |
| func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	// accelerate report interval so test terminates quickly
 | |
| 	oldpi := v3rpc.GetProgressReportInterval()
 | |
| 	// using atomics to avoid race warnings
 | |
| 	v3rpc.SetProgressReportInterval(3 * time.Second)
 | |
| 	pi := 3 * time.Second
 | |
| 	defer func() { v3rpc.SetProgressReportInterval(oldpi) }()
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	wc := clus.RandClient()
 | |
| 
 | |
| 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 | |
| 	if watchOnPut {
 | |
| 		opts = append(opts, clientv3.WithPrefix())
 | |
| 	}
 | |
| 	rch := wc.Watch(context.Background(), "foo", opts...)
 | |
| 
 | |
| 	select {
 | |
| 	case resp := <-rch: // wait for notification
 | |
| 		if len(resp.Events) != 0 {
 | |
| 			t.Fatalf("resp.Events expected none, got %+v", resp.Events)
 | |
| 		}
 | |
| 	case <-time.After(2 * pi):
 | |
| 		t.Fatalf("watch response expected in %v, but timed out", pi)
 | |
| 	}
 | |
| 
 | |
| 	kvc := clus.RandClient()
 | |
| 	if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case resp := <-rch:
 | |
| 		if resp.Header.Revision != 2 {
 | |
| 			t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
 | |
| 		}
 | |
| 		if watchOnPut { // wait for put if watch on the put key
 | |
| 			ev := []*clientv3.Event{{Type: clientv3.EventTypePut,
 | |
| 				Kv: &mvccpb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
 | |
| 			if !reflect.DeepEqual(ev, resp.Events) {
 | |
| 				t.Fatalf("expected %+v, got %+v", ev, resp.Events)
 | |
| 			}
 | |
| 		} else if len(resp.Events) != 0 { // wait for notification otherwise
 | |
| 			t.Fatalf("expected no events, but got %+v", resp.Events)
 | |
| 		}
 | |
| 	case <-time.After(time.Duration(1.5 * float64(pi))):
 | |
| 		t.Fatalf("watch response expected in %v, but timed out", pi)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
 | |
| 	progressInterval := 200 * time.Millisecond
 | |
| 	clus := integration.NewClusterV3(t,
 | |
| 		&integration.ClusterConfig{
 | |
| 			Size:                        3,
 | |
| 			WatchProgressNotifyInterval: progressInterval,
 | |
| 		})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 | |
| 	rch := clus.RandClient().Watch(context.Background(), "foo", opts...)
 | |
| 
 | |
| 	timeout := 1 * time.Second // we expect to receive watch progress notify in 2 * progressInterval,
 | |
| 	// but for CPU-starved situation it may take longer. So we use 1 second here for timeout.
 | |
| 	select {
 | |
| 	case resp := <-rch: // waiting for a watch progress notify response
 | |
| 		if !resp.IsProgressNotify() {
 | |
| 			t.Fatalf("expected resp.IsProgressNotify() == true")
 | |
| 		}
 | |
| 	case <-time.After(timeout):
 | |
| 		t.Fatalf("timed out waiting for watch progress notify response in %v", timeout)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchRequestProgress(t *testing.T) {
 | |
| 	if integration.ThroughProxy {
 | |
| 		t.Skipf("grpc-proxy does not support WatchProgress yet")
 | |
| 	}
 | |
| 	testCases := []struct {
 | |
| 		name     string
 | |
| 		watchers []string
 | |
| 	}{
 | |
| 		{"0-watcher", []string{}},
 | |
| 		{"1-watcher", []string{"/"}},
 | |
| 		{"2-watcher", []string{"/", "/"}},
 | |
| 	}
 | |
| 
 | |
| 	for _, c := range testCases {
 | |
| 		t.Run(c.name, func(t *testing.T) {
 | |
| 			defer testutil.AfterTest(t)
 | |
| 
 | |
| 			watchTimeout := 3 * time.Second
 | |
| 
 | |
| 			clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 | |
| 			defer clus.Terminate(t)
 | |
| 
 | |
| 			wc := clus.RandClient()
 | |
| 
 | |
| 			var watchChans []clientv3.WatchChan
 | |
| 
 | |
| 			for _, prefix := range c.watchers {
 | |
| 				watchChans = append(watchChans, wc.Watch(context.Background(), prefix, clientv3.WithPrefix()))
 | |
| 			}
 | |
| 
 | |
| 			_, err := wc.Put(context.Background(), "/a", "1")
 | |
| 			if err != nil {
 | |
| 				t.Fatal(err)
 | |
| 			}
 | |
| 
 | |
| 			for _, rch := range watchChans {
 | |
| 				select {
 | |
| 				case resp := <-rch: // wait for notification
 | |
| 					if len(resp.Events) != 1 {
 | |
| 						t.Fatalf("resp.Events expected 1, got %d", len(resp.Events))
 | |
| 					}
 | |
| 				case <-time.After(watchTimeout):
 | |
| 					t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			// put a value not being watched to increment revision
 | |
| 			_, err = wc.Put(context.Background(), "x", "1")
 | |
| 			if err != nil {
 | |
| 				t.Fatal(err)
 | |
| 			}
 | |
| 
 | |
| 			err = wc.RequestProgress(context.Background())
 | |
| 			if err != nil {
 | |
| 				t.Fatal(err)
 | |
| 			}
 | |
| 
 | |
| 			// verify all watch channels receive a progress notify
 | |
| 			for _, rch := range watchChans {
 | |
| 				select {
 | |
| 				case resp := <-rch:
 | |
| 					if !resp.IsProgressNotify() {
 | |
| 						t.Fatalf("expected resp.IsProgressNotify() == true")
 | |
| 					}
 | |
| 					if resp.Header.Revision != 3 {
 | |
| 						t.Fatalf("resp.Header.Revision expected 3, got %d", resp.Header.Revision)
 | |
| 					}
 | |
| 				case <-time.After(watchTimeout):
 | |
| 					t.Fatalf("progress response expected in %v, but timed out", watchTimeout)
 | |
| 				}
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchEventType(t *testing.T) {
 | |
| 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer cluster.Terminate(t)
 | |
| 
 | |
| 	client := cluster.RandClient()
 | |
| 	ctx := context.Background()
 | |
| 	watchChan := client.Watch(ctx, "/", clientv3.WithPrefix())
 | |
| 
 | |
| 	if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil {
 | |
| 		t.Fatalf("Put failed: %v", err)
 | |
| 	}
 | |
| 	if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil {
 | |
| 		t.Fatalf("Put failed: %v", err)
 | |
| 	}
 | |
| 	if _, err := client.Delete(ctx, "/toDelete"); err != nil {
 | |
| 		t.Fatalf("Delete failed: %v", err)
 | |
| 	}
 | |
| 	lcr, err := client.Lease.Grant(ctx, 1)
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("lease create failed: %v", err)
 | |
| 	}
 | |
| 	if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(lcr.ID)); err != nil {
 | |
| 		t.Fatalf("Put failed: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	tests := []struct {
 | |
| 		et       mvccpb.Event_EventType
 | |
| 		isCreate bool
 | |
| 		isModify bool
 | |
| 	}{{
 | |
| 		et:       clientv3.EventTypePut,
 | |
| 		isCreate: true,
 | |
| 	}, {
 | |
| 		et:       clientv3.EventTypePut,
 | |
| 		isModify: true,
 | |
| 	}, {
 | |
| 		et: clientv3.EventTypeDelete,
 | |
| 	}, {
 | |
| 		et:       clientv3.EventTypePut,
 | |
| 		isCreate: true,
 | |
| 	}, {
 | |
| 		et: clientv3.EventTypeDelete,
 | |
| 	}}
 | |
| 
 | |
| 	var res []*clientv3.Event
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case wres := <-watchChan:
 | |
| 			res = append(res, wres.Events...)
 | |
| 		case <-time.After(10 * time.Second):
 | |
| 			t.Fatalf("Should receive %d events and then break out loop", len(tests))
 | |
| 		}
 | |
| 		if len(res) == len(tests) {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i, tt := range tests {
 | |
| 		ev := res[i]
 | |
| 		if tt.et != ev.Type {
 | |
| 			t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type)
 | |
| 		}
 | |
| 		if tt.isCreate && !ev.IsCreate() {
 | |
| 			t.Errorf("#%d: event should be CreateEvent", i)
 | |
| 		}
 | |
| 		if tt.isModify && !ev.IsModify() {
 | |
| 			t.Errorf("#%d: event should be ModifyEvent", i)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchErrConnClosed(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	cli := clus.Client(0)
 | |
| 
 | |
| 	donec := make(chan struct{})
 | |
| 	go func() {
 | |
| 		defer close(donec)
 | |
| 		ch := cli.Watch(context.TODO(), "foo")
 | |
| 
 | |
| 		if wr := <-ch; !isCanceled(wr.Err()) {
 | |
| 			t.Errorf("expected context canceled, got %v", wr.Err())
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	if err := cli.ActiveConnection().Close(); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	clus.TakeClient(0)
 | |
| 
 | |
| 	select {
 | |
| 	case <-time.After(integration.RequestWaitTimeout):
 | |
| 		t.Fatal("wc.Watch took too long")
 | |
| 	case <-donec:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestWatchAfterClose(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	cli := clus.Client(0)
 | |
| 	clus.TakeClient(0)
 | |
| 	if err := cli.Close(); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	donec := make(chan struct{})
 | |
| 	go func() {
 | |
| 		cli.Watch(context.TODO(), "foo")
 | |
| 		if err := cli.Close(); err != nil && err != context.Canceled {
 | |
| 			t.Errorf("expected %v, got %v", context.Canceled, err)
 | |
| 		}
 | |
| 		close(donec)
 | |
| 	}()
 | |
| 	select {
 | |
| 	case <-time.After(integration.RequestWaitTimeout):
 | |
| 		t.Fatal("wc.Watch took too long")
 | |
| 	case <-donec:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchWithRequireLeader checks the watch channel closes when no leader.
 | |
| func TestWatchWithRequireLeader(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	// Put a key for the non-require leader watch to read as an event.
 | |
| 	// The watchers will be on member[0]; put key through member[0] to
 | |
| 	// ensure that it receives the update so watching after killing quorum
 | |
| 	// is guaranteed to have the key.
 | |
| 	liveClient := clus.Client(0)
 | |
| 	if _, err := liveClient.Put(context.TODO(), "foo", "bar"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	clus.Members[1].Stop(t)
 | |
| 	clus.Members[2].Stop(t)
 | |
| 	clus.Client(1).Close()
 | |
| 	clus.Client(2).Close()
 | |
| 	clus.TakeClient(1)
 | |
| 	clus.TakeClient(2)
 | |
| 
 | |
| 	// wait for election timeout, then member[0] will not have a leader.
 | |
| 	tickDuration := 10 * time.Millisecond
 | |
| 	// existing streams need three elections before they're torn down; wait until 5 elections cycle
 | |
| 	// so proxy tests receive a leader loss event on its existing watch before creating a new watch.
 | |
| 	time.Sleep(time.Duration(5*clus.Members[0].ElectionTicks) * tickDuration)
 | |
| 
 | |
| 	chLeader := liveClient.Watch(clientv3.WithRequireLeader(context.TODO()), "foo", clientv3.WithRev(1))
 | |
| 	chNoLeader := liveClient.Watch(context.TODO(), "foo", clientv3.WithRev(1))
 | |
| 
 | |
| 	select {
 | |
| 	case resp, ok := <-chLeader:
 | |
| 		if !ok {
 | |
| 			t.Fatalf("expected %v watch channel, got closed channel", rpctypes.ErrNoLeader)
 | |
| 		}
 | |
| 		if resp.Err() != rpctypes.ErrNoLeader {
 | |
| 			t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
 | |
| 		}
 | |
| 	case <-time.After(integration.RequestWaitTimeout):
 | |
| 		t.Fatal("watch without leader took too long to close")
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case resp, ok := <-chLeader:
 | |
| 		if ok {
 | |
| 			t.Fatalf("expected closed channel, got response %v", resp)
 | |
| 		}
 | |
| 	case <-time.After(integration.RequestWaitTimeout):
 | |
| 		t.Fatal("waited too long for channel to close")
 | |
| 	}
 | |
| 
 | |
| 	if _, ok := <-chNoLeader; !ok {
 | |
| 		t.Fatalf("expected response, got closed channel")
 | |
| 	}
 | |
| 
 | |
| 	cnt, err := clus.Members[0].Metric(
 | |
| 		"etcd_server_client_requests_total",
 | |
| 		`type="stream"`,
 | |
| 		fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	cv, err := strconv.ParseInt(cnt, 10, 32)
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	if cv < 2 { // >2 when retried
 | |
| 		t.Fatalf("expected at least 2, got %q", cnt)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchWithFilter checks that watch filtering works.
 | |
| func TestWatchWithFilter(t *testing.T) {
 | |
| 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer cluster.Terminate(t)
 | |
| 
 | |
| 	client := cluster.RandClient()
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	wcNoPut := client.Watch(ctx, "a", clientv3.WithFilterPut())
 | |
| 	wcNoDel := client.Watch(ctx, "a", clientv3.WithFilterDelete())
 | |
| 
 | |
| 	if _, err := client.Put(ctx, "a", "abc"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	if _, err := client.Delete(ctx, "a"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	npResp := <-wcNoPut
 | |
| 	if len(npResp.Events) != 1 || npResp.Events[0].Type != clientv3.EventTypeDelete {
 | |
| 		t.Fatalf("expected delete event, got %+v", npResp.Events)
 | |
| 	}
 | |
| 	ndResp := <-wcNoDel
 | |
| 	if len(ndResp.Events) != 1 || ndResp.Events[0].Type != clientv3.EventTypePut {
 | |
| 		t.Fatalf("expected put event, got %+v", ndResp.Events)
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case resp := <-wcNoPut:
 | |
| 		t.Fatalf("unexpected event on filtered put (%+v)", resp)
 | |
| 	case resp := <-wcNoDel:
 | |
| 		t.Fatalf("unexpected event on filtered delete (%+v)", resp)
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchWithCreatedNotification checks that WithCreatedNotify returns a
 | |
| // Created watch response.
 | |
| func TestWatchWithCreatedNotification(t *testing.T) {
 | |
| 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer cluster.Terminate(t)
 | |
| 
 | |
| 	client := cluster.RandClient()
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	createC := client.Watch(ctx, "a", clientv3.WithCreatedNotify())
 | |
| 
 | |
| 	resp := <-createC
 | |
| 
 | |
| 	if !resp.Created {
 | |
| 		t.Fatalf("expected created event, got %v", resp)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchWithCreatedNotificationDropConn ensures that
 | |
| // a watcher with created notify does not post duplicate
 | |
| // created events from disconnect.
 | |
| func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
 | |
| 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer cluster.Terminate(t)
 | |
| 
 | |
| 	client := cluster.RandClient()
 | |
| 
 | |
| 	wch := client.Watch(context.Background(), "a", clientv3.WithCreatedNotify())
 | |
| 
 | |
| 	resp := <-wch
 | |
| 
 | |
| 	if !resp.Created {
 | |
| 		t.Fatalf("expected created event, got %v", resp)
 | |
| 	}
 | |
| 
 | |
| 	cluster.Members[0].DropConnections()
 | |
| 
 | |
| 	// check watch channel doesn't post another watch response.
 | |
| 	select {
 | |
| 	case wresp := <-wch:
 | |
| 		t.Fatalf("got unexpected watch response: %+v\n", wresp)
 | |
| 	case <-time.After(time.Second):
 | |
| 		// watcher may not reconnect by the time it hits the select,
 | |
| 		// so it wouldn't have a chance to filter out the second create event
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
 | |
| func TestWatchCancelOnServer(t *testing.T) {
 | |
| 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer cluster.Terminate(t)
 | |
| 
 | |
| 	client := cluster.RandClient()
 | |
| 	numWatches := 10
 | |
| 
 | |
| 	// The grpc proxy starts watches to detect leadership after the proxy server
 | |
| 	// returns as started; to avoid racing on the proxy's internal watches, wait
 | |
| 	// until require leader watches get create responses to ensure the leadership
 | |
| 	// watches have started.
 | |
| 	for {
 | |
| 		ctx, cancel := context.WithCancel(clientv3.WithRequireLeader(context.TODO()))
 | |
| 		ww := client.Watch(ctx, "a", clientv3.WithCreatedNotify())
 | |
| 		wresp := <-ww
 | |
| 		cancel()
 | |
| 		if wresp.Err() == nil {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	cancels := make([]context.CancelFunc, numWatches)
 | |
| 	for i := 0; i < numWatches; i++ {
 | |
| 		// force separate streams in client
 | |
| 		md := metadata.Pairs("some-key", fmt.Sprintf("%d", i))
 | |
| 		mctx := metadata.NewOutgoingContext(context.Background(), md)
 | |
| 		ctx, cancel := context.WithCancel(mctx)
 | |
| 		cancels[i] = cancel
 | |
| 		w := client.Watch(ctx, fmt.Sprintf("%d", i), clientv3.WithCreatedNotify())
 | |
| 		<-w
 | |
| 	}
 | |
| 
 | |
| 	// get max watches; proxy tests have leadership watches, so total may be >numWatches
 | |
| 	maxWatches, _ := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
 | |
| 
 | |
| 	// cancel all and wait for cancels to propagate to etcd server
 | |
| 	for i := 0; i < numWatches; i++ {
 | |
| 		cancels[i]()
 | |
| 	}
 | |
| 	time.Sleep(time.Second)
 | |
| 
 | |
| 	minWatches, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	maxWatchV, minWatchV := 0, 0
 | |
| 	n, serr := fmt.Sscanf(maxWatches+" "+minWatches, "%d %d", &maxWatchV, &minWatchV)
 | |
| 	if n != 2 || serr != nil {
 | |
| 		t.Fatalf("expected n=2 and err=nil, got n=%d and err=%v", n, serr)
 | |
| 	}
 | |
| 
 | |
| 	if maxWatchV-minWatchV < numWatches {
 | |
| 		t.Fatalf("expected %d canceled watchers, got %d", numWatches, maxWatchV-minWatchV)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchOverlapContextCancel stresses the watcher stream teardown path by
 | |
| // creating/canceling watchers to ensure that new watchers are not taken down
 | |
| // by a torn down watch stream. The sort of race that's being detected:
 | |
| //     1. create w1 using a cancelable ctx with %v as "ctx"
 | |
| //     2. cancel ctx
 | |
| //     3. watcher client begins tearing down watcher grpc stream since no more watchers
 | |
| //     3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
 | |
| //     4. watcher client finishes tearing down stream on "ctx"
 | |
| //     5. w2 comes back canceled
 | |
| func TestWatchOverlapContextCancel(t *testing.T) {
 | |
| 	f := func(clus *integration.ClusterV3) {}
 | |
| 	testWatchOverlapContextCancel(t, f)
 | |
| }
 | |
| 
 | |
| func TestWatchOverlapDropConnContextCancel(t *testing.T) {
 | |
| 	f := func(clus *integration.ClusterV3) {
 | |
| 		clus.Members[0].DropConnections()
 | |
| 	}
 | |
| 	testWatchOverlapContextCancel(t, f)
 | |
| }
 | |
| 
 | |
| func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	n := 100
 | |
| 	ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
 | |
| 	for i := range ctxs {
 | |
| 		// make unique stream
 | |
| 		md := metadata.Pairs("some-key", fmt.Sprintf("%d", i))
 | |
| 		ctxs[i] = metadata.NewOutgoingContext(context.Background(), md)
 | |
| 		// limits the maximum number of outstanding watchers per stream
 | |
| 		ctxc[i] = make(chan struct{}, 2)
 | |
| 	}
 | |
| 
 | |
| 	// issue concurrent watches on "abc" with cancel
 | |
| 	cli := clus.RandClient()
 | |
| 	if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	ch := make(chan struct{}, n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		go func() {
 | |
| 			defer func() { ch <- struct{}{} }()
 | |
| 			idx := rand.Intn(len(ctxs))
 | |
| 			ctx, cancel := context.WithCancel(ctxs[idx])
 | |
| 			ctxc[idx] <- struct{}{}
 | |
| 			wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
 | |
| 			f(clus)
 | |
| 			select {
 | |
| 			case _, ok := <-wch:
 | |
| 				if !ok {
 | |
| 					t.Errorf("unexpected closed channel %p", wch)
 | |
| 				}
 | |
| 			// may take a second or two to reestablish a watcher because of
 | |
| 			// grpc back off policies for disconnects
 | |
| 			case <-time.After(5 * time.Second):
 | |
| 				t.Errorf("timed out waiting for watch on %p", wch)
 | |
| 			}
 | |
| 			// randomize how cancel overlaps with watch creation
 | |
| 			if rand.Intn(2) == 0 {
 | |
| 				<-ctxc[idx]
 | |
| 				cancel()
 | |
| 			} else {
 | |
| 				cancel()
 | |
| 				<-ctxc[idx]
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 	// join on watches
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		select {
 | |
| 		case <-ch:
 | |
| 		case <-time.After(5 * time.Second):
 | |
| 			t.Fatalf("timed out waiting for completed watch")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchCancelAndCloseClient ensures that canceling a watcher then immediately
 | |
| // closing the client does not return a client closing error.
 | |
| func TestWatchCancelAndCloseClient(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 	cli := clus.Client(0)
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	wch := cli.Watch(ctx, "abc")
 | |
| 	donec := make(chan struct{})
 | |
| 	go func() {
 | |
| 		defer close(donec)
 | |
| 		select {
 | |
| 		case wr, ok := <-wch:
 | |
| 			if ok {
 | |
| 				t.Errorf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
 | |
| 			}
 | |
| 		case <-time.After(5 * time.Second):
 | |
| 			t.Error("timed out waiting for closed channel")
 | |
| 		}
 | |
| 	}()
 | |
| 	cancel()
 | |
| 	if err := cli.Close(); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	<-donec
 | |
| 	clus.TakeClient(0)
 | |
| }
 | |
| 
 | |
| // TestWatchStressResumeClose establishes a bunch of watchers, disconnects
 | |
| // to put them in resuming mode, cancels them so some resumes by cancel fail,
 | |
| // then closes the watcher interface to ensure correct clean up.
 | |
| func TestWatchStressResumeClose(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 	cli := clus.Client(0)
 | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	// add more watches than can be resumed before the cancel
 | |
| 	wchs := make([]clientv3.WatchChan, 2000)
 | |
| 	for i := range wchs {
 | |
| 		wchs[i] = cli.Watch(ctx, "abc")
 | |
| 	}
 | |
| 	clus.Members[0].DropConnections()
 | |
| 	cancel()
 | |
| 	if err := cli.Close(); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	clus.TakeClient(0)
 | |
| }
 | |
| 
 | |
| // TestWatchCancelDisconnected ensures canceling a watcher works when
 | |
| // its grpc stream is disconnected / reconnecting.
 | |
| func TestWatchCancelDisconnected(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | |
| 	defer clus.Terminate(t)
 | |
| 	cli := clus.Client(0)
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	// add more watches than can be resumed before the cancel
 | |
| 	wch := cli.Watch(ctx, "abc")
 | |
| 	clus.Members[0].Stop(t)
 | |
| 	cancel()
 | |
| 	select {
 | |
| 	case <-wch:
 | |
| 	case <-time.After(time.Second):
 | |
| 		t.Fatal("took too long to cancel disconnected watcher")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestWatchClose ensures that close does not return error
 | |
| func TestWatchClose(t *testing.T) {
 | |
| 	runWatchTest(t, testWatchClose)
 | |
| }
 | |
| 
 | |
| func testWatchClose(t *testing.T, wctx *watchctx) {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	wch := wctx.w.Watch(ctx, "a")
 | |
| 	cancel()
 | |
| 	if wch == nil {
 | |
| 		t.Fatalf("expected watcher channel, got nil")
 | |
| 	}
 | |
| 	if wctx.w.Close() != nil {
 | |
| 		t.Fatalf("watch did not close successfully")
 | |
| 	}
 | |
| 	wresp, ok := <-wch
 | |
| 	if ok {
 | |
| 		t.Fatalf("read wch got %v; expected closed channel", wresp)
 | |
| 	}
 | |
| }
 |