From 580c563ed626aa85a37cc84239e23337e954e690 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 29 Jan 2016 16:11:42 -0800 Subject: [PATCH] clientv3: watcher implementation --- clientv3/integration/watch_test.go | 262 +++++++++++++++++ clientv3/watch.go | 439 +++++++++++++++++++++++++++++ 2 files changed, 701 insertions(+) create mode 100644 clientv3/integration/watch_test.go diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go new file mode 100644 index 000000000..93e298786 --- /dev/null +++ b/clientv3/integration/watch_test.go @@ -0,0 +1,262 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "reflect" + "sort" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" + storagepb "github.com/coreos/etcd/storage/storagepb" +) + +type watcherTest func(*testing.T, *watchctx) + +type watchctx struct { + clus *integration.ClusterV3 + w clientv3.Watcher + wclient *clientv3.Client + kv clientv3.KV + ch <-chan clientv3.WatchResponse +} + +func runWatchTest(t *testing.T, f watcherTest) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + wclient := clus.RandClient() + w := clientv3.NewWatcher(wclient) + defer w.Close() + // select a different client from wclient so puts succeed if + // a test knocks out the watcher client + kvclient := clus.RandClient() + for kvclient == wclient { + kvclient = clus.RandClient() + } + kv := clientv3.NewKV(kvclient) + + wctx := &watchctx{clus, w, wclient, kv, nil} + f(t, wctx) +} + +// TestWatchMultiWatcher modifies multiple keys and observes the changes. +func TestWatchMultiWatcher(t *testing.T) { + runWatchTest(t, testWatchMultiWatcher) +} + +func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { + numKeyUpdates := 4 + keys := []string{"foo", "bar", "baz"} + + donec := make(chan struct{}) + readyc := make(chan struct{}) + for _, k := range keys { + // key watcher + go func(key string) { + ch := wctx.w.Watch(context.TODO(), key, 0) + if ch == nil { + t.Fatalf("expected watcher channel, got nil") + } + readyc <- struct{}{} + for i := 0; i < numKeyUpdates; i++ { + resp, ok := <-ch + if !ok { + t.Fatalf("watcher unexpectedly closed") + } + v := fmt.Sprintf("%s-%d", key, i) + gotv := string(resp.Events[0].Kv.Value) + if gotv != v { + t.Errorf("#%d: got %s, wanted %s", i, gotv, v) + } + } + donec <- struct{}{} + }(k) + } + // prefix watcher on "b" (bar and baz) + go func() { + prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0) + if prefixc == nil { + t.Fatalf("expected watcher channel, got nil") + } + readyc <- struct{}{} + evs := []*storagepb.Event{} + for i := 0; i < numKeyUpdates*2; i++ { + resp, ok := <-prefixc + if !ok { + t.Fatalf("watcher unexpectedly closed") + } + evs = append(evs, resp.Events...) + } + + // check response + expected := []string{} + bkeys := []string{"bar", "baz"} + for _, k := range bkeys { + for i := 0; i < numKeyUpdates; i++ { + expected = append(expected, fmt.Sprintf("%s-%d", k, i)) + } + } + got := []string{} + for _, ev := range evs { + got = append(got, string(ev.Kv.Value)) + } + sort.Strings(got) + if reflect.DeepEqual(expected, got) == false { + t.Errorf("got %v, expected %v", got, expected) + } + + // ensure no extra data + select { + case resp, ok := <-prefixc: + if !ok { + t.Fatalf("watcher unexpectedly closed") + } + t.Fatalf("unexpected event %+v", resp) + case <-time.After(time.Second): + } + donec <- struct{}{} + }() + + // wait for watcher bring up + for i := 0; i < len(keys)+1; i++ { + <-readyc + } + // generate events + for i := 0; i < numKeyUpdates; i++ { + for _, k := range keys { + v := fmt.Sprintf("%s-%d", k, i) + if _, err := wctx.kv.Put(k, v, 0); err != nil { + t.Fatal(err) + } + } + } + // wait for watcher shutdown + for i := 0; i < len(keys)+1; i++ { + <-donec + } +} + +// TestWatchReconnInit tests watcher resumes correctly if connection lost +// before any data was sent. +func TestWatchReconnInit(t *testing.T) { + runWatchTest(t, testWatchReconnInit) +} + +func testWatchReconnInit(t *testing.T, wctx *watchctx) { + if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + t.Fatalf("expected non-nil channel") + } + // take down watcher connection + wctx.wclient.ActiveConnection().Close() + // watcher should recover + putAndWatch(t, wctx, "a", "a") +} + +// TestWatchReconnRunning tests watcher resumes correctly if connection lost +// after data was sent. +func TestWatchReconnRunning(t *testing.T) { + runWatchTest(t, testWatchReconnRunning) +} + +func testWatchReconnRunning(t *testing.T, wctx *watchctx) { + if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + t.Fatalf("expected non-nil channel") + } + putAndWatch(t, wctx, "a", "a") + // take down watcher connection + wctx.wclient.ActiveConnection().Close() + // watcher should recover + putAndWatch(t, wctx, "a", "b") +} + +// TestWatchCancelInit tests watcher closes correctly after no events. +func TestWatchCancelInit(t *testing.T) { + runWatchTest(t, testWatchCancelInit) +} + +func testWatchCancelInit(t *testing.T, wctx *watchctx) { + ctx, cancel := context.WithCancel(context.Background()) + if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { + t.Fatalf("expected non-nil watcher channel") + } + cancel() + select { + case <-time.After(time.Second): + t.Fatalf("took too long to cancel") + case _, ok := <-wctx.ch: + if ok { + t.Fatalf("expected watcher channel to close") + } + } +} + +// TestWatchCancelRunning tests watcher closes correctly after events. +func TestWatchCancelRunning(t *testing.T) { + runWatchTest(t, testWatchCancelRunning) +} + +func testWatchCancelRunning(t *testing.T, wctx *watchctx) { + ctx, cancel := context.WithCancel(context.Background()) + if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { + t.Fatalf("expected non-nil watcher channel") + } + if _, err := wctx.kv.Put("a", "a", 0); err != nil { + t.Fatal(err) + } + cancel() + select { + case <-time.After(time.Second): + t.Fatalf("took too long to cancel") + case v, ok := <-wctx.ch: + if !ok { + // closed before getting put; OK + break + } + // got the PUT; should close next + select { + case <-time.After(time.Second): + t.Fatalf("took too long to close") + case v, ok = <-wctx.ch: + if ok { + t.Fatalf("expected watcher channel to close, got %v", v) + } + } + } +} + +func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { + if _, err := wctx.kv.Put(key, val, 0); err != nil { + t.Fatal(err) + } + select { + case <-time.After(5 * time.Second): + t.Fatalf("watch timed out") + case v, ok := <-wctx.ch: + if !ok { + t.Fatalf("unexpected watch close") + } + if string(v.Events[0].Kv.Value) != val { + t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val) + } + } +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 87de6cc18..80ecca4e8 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -15,7 +15,11 @@ package clientv3 import ( + "fmt" + "sync" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" storagepb "github.com/coreos/etcd/storage/storagepb" ) @@ -41,3 +45,438 @@ type WatchResponse struct { Header pb.ResponseHeader Events []*storagepb.Event } + +// watcher implements the Watcher interface +type watcher struct { + c *Client + conn *grpc.ClientConn + remote pb.WatchClient + + // ctx controls internal remote.Watch requests + ctx context.Context + cancel context.CancelFunc + + // streams holds all active watchers + streams map[int64]*watcherStream + // mu protects the streams map + mu sync.RWMutex + + // reqc sends a watch request from Watch() to the main goroutine + reqc chan *watchRequest + // respc receives data from the watch client + respc chan *pb.WatchResponse + // stopc is sent to the main goroutine to stop all processing + stopc chan struct{} + // donec closes to broadcast shutdown + donec chan struct{} + // errc transmits errors from grpc Recv + errc chan error +} + +// watchRequest is issued by the subscriber to start a new watcher +type watchRequest struct { + ctx context.Context + key string + prefix string + rev int64 + // retc receives a chan WatchResponse once the watcher is established + retc chan chan WatchResponse +} + +// watcherStream represents a registered watcher +type watcherStream struct { + initReq watchRequest + + // outc publishes watch responses to subscriber + outc chan<- WatchResponse + // recvc buffers watch responses before publishing + recvc chan *WatchResponse + id int64 + + // lastRev is revision last successfully sent over outc + lastRev int64 + // resumec indicates the stream must recover at a given revision + resumec chan int64 +} + +func NewWatcher(c *Client) Watcher { + ctx, cancel := context.WithCancel(context.Background()) + conn := c.ActiveConnection() + + w := &watcher{ + c: c, + conn: conn, + remote: pb.NewWatchClient(conn), + + ctx: ctx, + cancel: cancel, + streams: make(map[int64]*watcherStream), + + respc: make(chan *pb.WatchResponse), + reqc: make(chan *watchRequest), + stopc: make(chan struct{}), + donec: make(chan struct{}), + errc: make(chan error, 1), + } + go w.run() + return w +} + +func (w *watcher) Watch(ctx context.Context, key string, rev int64) <-chan WatchResponse { + return w.watch(ctx, key, "", rev) +} + +func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) <-chan WatchResponse { + return w.watch(ctx, "", prefix, rev) +} + +func (w *watcher) Close() error { + select { + case w.stopc <- struct{}{}: + case <-w.donec: + } + <-w.donec + return <-w.errc +} + +// watch posts a watch request to run() and waits for a new watcher channel +func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) <-chan WatchResponse { + retc := make(chan chan WatchResponse, 1) + wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc} + // submit request + select { + case w.reqc <- wr: + case <-wr.ctx.Done(): + return nil + case <-w.donec: + return nil + } + // receive channel + select { + case ret := <-retc: + return ret + case <-ctx.Done(): + return nil + case <-w.donec: + return nil + } +} + +func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { + if pendingReq == nil { + // no pending request; ignore + return + } else if resp.WatchId == -1 || resp.Compacted { + // failed; no channel + pendingReq.retc <- nil + return + } + + ret := make(chan WatchResponse) + ws := &watcherStream{ + initReq: *pendingReq, + id: resp.WatchId, + outc: ret, + // buffered so unlikely to block on sending while holding mu + recvc: make(chan *WatchResponse, 4), + resumec: make(chan int64), + } + + w.mu.Lock() + w.streams[ws.id] = ws + w.mu.Unlock() + + // send messages to subscriber + go w.serveStream(ws) + + // pass back the subscriber channel for the watcher + pendingReq.retc <- ret +} + +// closeStream closes the watcher resources and removes it +func (w *watcher) closeStream(ws *watcherStream) { + // cancels request stream; subscriber receives nil channel + close(ws.initReq.retc) + // close subscriber's channel + close(ws.outc) + // shutdown serveStream + close(ws.recvc) + delete(w.streams, ws.id) +} + +// run is the root of the goroutines for managing a watcher client +func (w *watcher) run() { + defer func() { + close(w.donec) + w.cancel() + }() + + // start a stream with the etcd grpc server + wc, wcerr := w.newWatchClient() + if wcerr != nil { + w.errc <- wcerr + return + } + + var pendingReq, failedReq *watchRequest + curReqC := w.reqc + cancelSet := make(map[int64]struct{}) + + for { + select { + // Watch() requested + case pendingReq = <-curReqC: + // no more watch requests until there's a response + curReqC = nil + if err := wc.Send(pendingReq.toPB()); err == nil { + // pendingReq now waits on w.respc + break + } + failedReq = pendingReq + // New events from the watch client + case pbresp := <-w.respc: + switch { + case pbresp.Canceled: + delete(cancelSet, pbresp.WatchId) + case pbresp.Compacted: + w.mu.Lock() + if ws, ok := w.streams[pbresp.WatchId]; ok { + w.closeStream(ws) + } + w.mu.Unlock() + case pbresp.Created: + // response to pending req, try to add + w.addStream(pbresp, pendingReq) + pendingReq = nil + curReqC = w.reqc + default: + // dispatch to appropriate watch stream + if ok := w.dispatchEvent(pbresp); ok { + break + } + // watch response on unexpected watch id; cancel id + if _, ok := cancelSet[pbresp.WatchId]; ok { + break + } + cancelSet[pbresp.WatchId] = struct{}{} + cr := &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{ + WatchId: pbresp.WatchId, + }, + } + req := &pb.WatchRequest{RequestUnion: cr} + wc.Send(req) + } + // watch client failed to recv; spawn another if possible + // TODO report watch client errors from errc? + case <-w.errc: + if wc, wcerr = w.newWatchClient(); wcerr != nil { + w.errc <- wcerr + return + } + curReqC = w.reqc + if pendingReq != nil { + failedReq = pendingReq + } + cancelSet = make(map[int64]struct{}) + case <-w.stopc: + w.errc <- nil + return + } + + // send failed; queue for retry + if failedReq != nil { + go func() { + select { + case w.reqc <- pendingReq: + case <-pendingReq.ctx.Done(): + case <-w.donec: + } + }() + failedReq = nil + pendingReq = nil + } + } +} + +// dispatchEvent sends a WatchResponse to the appropriate watcher stream +func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { + w.mu.RLock() + defer w.mu.RUnlock() + ws, ok := w.streams[pbresp.WatchId] + if ok { + wr := &WatchResponse{*pbresp.Header, pbresp.Events} + ws.recvc <- wr + } + return ok +} + +// serveWatchClient forwards messages from the grpc stream to run() +func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) { + for { + resp, err := wc.Recv() + if err != nil { + select { + case w.errc <- err: + case <-w.donec: + } + return + } + select { + case w.respc <- resp: + case <-w.donec: + return + } + } +} + +// serveStream forwards watch responses from run() to the subscriber +func (w *watcher) serveStream(ws *watcherStream) { + emptyWr := &WatchResponse{} + wrs := []*WatchResponse{} + resuming := false + closing := false + for !closing { + curWr := emptyWr + outc := ws.outc + if len(wrs) > 0 { + curWr = wrs[0] + } else { + outc = nil + } + select { + case outc <- *curWr: + newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision + if newRev != ws.lastRev { + ws.lastRev = newRev + } + wrs[0] = nil + wrs = wrs[1:] + case wr, ok := <-ws.recvc: + if !ok { + // shutdown from closeStream + return + } + // resume up to last seen event if disconnected + if resuming { + resuming = false + // trim events already seen + for i := 0; i < len(wr.Events); i++ { + if wr.Events[i].Kv.ModRevision > ws.lastRev { + wr.Events = wr.Events[i:] + break + } + } + // only forward new events + if wr.Events[0].Kv.ModRevision == ws.lastRev { + break + } + } + // TODO don't keep buffering if subscriber stops reading + wrs = append(wrs, wr) + case resumeRev := <-ws.resumec: + if resumeRev != ws.lastRev { + panic("unexpected resume revision") + } + wrs = nil + resuming = true + case <-w.donec: + closing = true + case <-ws.initReq.ctx.Done(): + closing = true + } + } + w.mu.Lock() + w.closeStream(ws) + w.mu.Unlock() + // lazily send cancel message if events on missing id +} + +func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) { + ws, rerr := w.resume() + if rerr != nil { + return nil, rerr + } + go w.serveWatchClient(ws) + return ws, nil +} + +// resume creates a new WatchClient with all current watchers reestablished +func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { + for { + if ws, err = w.openWatchClient(); err != nil { + break + } else if err = w.resumeWatchers(ws); err == nil { + break + } + } + return ws, err +} + +// openWatchClient retries opening a watchclient until retryConnection fails +func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { + for { + if ws, err = w.remote.Watch(w.ctx); ws != nil { + break + } else if isRPCError(err) { + return nil, err + } + newConn, nerr := w.c.retryConnection(w.conn, nil) + if nerr != nil { + return nil, nerr + } + w.conn = newConn + w.remote = pb.NewWatchClient(w.conn) + } + return ws, nil +} + +// resumeWatchers rebuilds every registered watcher on a new client +func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { + streams := []*watcherStream{} + w.mu.RLock() + for _, ws := range w.streams { + streams = append(streams, ws) + } + w.mu.RUnlock() + + for _, ws := range streams { + // reconstruct watcher from initial request + if ws.lastRev != 0 { + ws.initReq.rev = ws.lastRev + } + if err := wc.Send(ws.initReq.toPB()); err != nil { + return err + } + + // wait for request ack + resp, err := wc.Recv() + if err != nil { + return err + } else if len(resp.Events) != 0 || resp.Created != true { + return fmt.Errorf("watcher: unexpected response (%+v)", resp) + } + + // id may be different since new remote watcher; update map + w.mu.Lock() + delete(w.streams, ws.id) + ws.id = resp.WatchId + w.streams[ws.id] = ws + w.mu.Unlock() + + ws.resumec <- ws.lastRev + } + return nil +} + +// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) +func (wr *watchRequest) toPB() *pb.WatchRequest { + req := &pb.WatchCreateRequest{StartRevision: wr.rev} + if wr.key != "" { + req.Key = []byte(wr.key) + } else { + req.Prefix = []byte(wr.prefix) + } + cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} + return &pb.WatchRequest{RequestUnion: cr} +}