diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index c4dd86ea9..4b2fa1885 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -343,7 +343,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { } } -// TestWatchResumeComapcted checks that the watcher gracefully closes in case +func TestWatchResumeInitRev(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.Client(0) + if _, err := cli.Put(context.TODO(), "b", "2"); err != nil { + t.Fatal(err) + } + if _, err := cli.Put(context.TODO(), "a", "3"); err != nil { + t.Fatal(err) + } + // if resume is broken, it'll pick up this key first instead of a=3 + if _, err := cli.Put(context.TODO(), "a", "4"); err != nil { + t.Fatal(err) + } + + wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify()) + if resp, ok := <-wch; !ok || resp.Header.Revision != 4 { + t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok) + } + // pause wch + clus.Members[0].DropConnections() + clus.Members[0].PauseConnections() + + select { + case resp, ok := <-wch: + t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok) + case <-time.After(100 * time.Millisecond): + } + + // resume wch + clus.Members[0].UnpauseConnections() + + select { + case resp, ok := <-wch: + if !ok { + t.Fatal("unexpected watch close") + } + if len(resp.Events) == 0 { + t.Fatal("expected event on watch") + } + if string(resp.Events[0].Kv.Value) != "3" { + t.Fatalf("expected value=3, got event %+v", resp.Events[0]) + } + case <-time.After(5 * time.Second): + t.Fatal("watch timed out") + } +} + +// TestWatchResumeCompacted checks that the watcher gracefully closes in case // that it tries to resume to a revision that's been compacted out of the store. // Since the watcher's server restarts with stale data, the watcher will receive // either a compaction error or all keys by staying in sync before the compaction diff --git a/clientv3/watch.go b/clientv3/watch.go index 7847b03b3..d85f3686d 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -615,11 +615,20 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ // send first creation event only if requested if ws.initReq.createdNotify { ws.outc <- *wr + if ws.initReq.rev == 0 { + // current revision of store; returning the + // create response binds the current revision to + // this revision, so restart with it if there's a + // disconnect before receiving any events. + nextRev = wr.Header.Revision + } } } + } else { + // current progress of watch; <= store revision + nextRev = wr.Header.Revision } - nextRev = wr.Header.Revision if len(wr.Events) > 0 { nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 } diff --git a/integration/bridge.go b/integration/bridge.go index 6c1ec5449..09c65aa1f 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -31,8 +31,9 @@ type bridge struct { l net.Listener conns map[*bridgeConn]struct{} - stopc chan struct{} - wg sync.WaitGroup + stopc chan struct{} + pausec chan struct{} + wg sync.WaitGroup mu sync.Mutex } @@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) { inaddr: addr + "0", outaddr: addr, conns: make(map[*bridgeConn]struct{}), - stopc: make(chan struct{}, 1), + stopc: make(chan struct{}), + pausec: make(chan struct{}), } + close(b.pausec) + l, err := transport.NewUnixListener(b.inaddr) if err != nil { return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) @@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr } func (b *bridge) Close() { b.l.Close() + b.mu.Lock() select { - case b.stopc <- struct{}{}: + case <-b.stopc: default: + close(b.stopc) } + b.mu.Unlock() b.wg.Wait() } @@ -75,6 +82,22 @@ func (b *bridge) Reset() { b.conns = make(map[*bridgeConn]struct{}) } +func (b *bridge) Pause() { + b.mu.Lock() + b.pausec = make(chan struct{}) + b.mu.Unlock() +} + +func (b *bridge) Unpause() { + b.mu.Lock() + select { + case <-b.pausec: + default: + close(b.pausec) + } + b.mu.Unlock() +} + func (b *bridge) serveListen() { defer func() { b.l.Close() @@ -91,13 +114,22 @@ func (b *bridge) serveListen() { if ierr != nil { return } + b.mu.Lock() + pausec := b.pausec + b.mu.Unlock() + select { + case <-b.stopc: + return + case <-pausec: + } + outc, oerr := net.Dial("unix", b.outaddr) if oerr != nil { inc.Close() return } - bc := &bridgeConn{inc, outc} + bc := &bridgeConn{inc, outc, make(chan struct{})} b.wg.Add(1) b.mu.Lock() b.conns[bc] = struct{}{} @@ -108,6 +140,7 @@ func (b *bridge) serveListen() { func (b *bridge) serveConn(bc *bridgeConn) { defer func() { + close(bc.donec) bc.Close() b.mu.Lock() delete(b.conns, bc) @@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) { } type bridgeConn struct { - in net.Conn - out net.Conn + in net.Conn + out net.Conn + donec chan struct{} } func (bc *bridgeConn) Close() { bc.in.Close() bc.out.Close() + <-bc.donec } diff --git a/integration/cluster.go b/integration/cluster.go index 4e46634da..fd7330a85 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -556,7 +556,9 @@ func (m *member) electionTimeout() time.Duration { return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond } -func (m *member) DropConnections() { m.grpcBridge.Reset() } +func (m *member) DropConnections() { m.grpcBridge.Reset() } +func (m *member) PauseConnections() { m.grpcBridge.Pause() } +func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) {