From 722f5b2a8c41846ae4658383b683b648b6837909 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 14 Jun 2016 21:26:36 -0700 Subject: [PATCH] clientv3: watch with arbitrary ctx values Sets up a new watch stream for every unique set of ctx values. Fixes #5354 --- clientv3/watch.go | 190 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 154 insertions(+), 36 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 64656578b..bc6ddc037 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -17,6 +17,7 @@ package clientv3 import ( "fmt" "sync" + "time" v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -27,6 +28,8 @@ import ( const ( EventTypeDelete = mvccpb.DELETE EventTypePut = mvccpb.PUT + + closeSendErrTimeout = 250 * time.Millisecond ) type Event mvccpb.Event @@ -56,6 +59,8 @@ type WatchResponse struct { // If the watch failed and the stream was about to close, before the channel is closed, // the channel sends a final response that has Canceled set to true with a non-nil Err(). Canceled bool + + closeErr error } // IsCreate returns true if the event tells that the key is newly created. @@ -70,10 +75,12 @@ func (e *Event) IsModify() bool { // Err is the error value if this WatchResponse holds an error. func (wr *WatchResponse) Err() error { - if wr.CompactRevision != 0 { + switch { + case wr.closeErr != nil: + return v3rpc.Error(wr.closeErr) + case wr.CompactRevision != 0: return v3rpc.ErrCompacted - } - if wr.Canceled { + case wr.Canceled: return v3rpc.ErrFutureRev } return nil @@ -88,14 +95,26 @@ func (wr *WatchResponse) IsProgressNotify() bool { type watcher struct { remote pb.WatchClient + // mu protects the grpc streams map + mu sync.RWMutex + // streams holds all the active grpc streams keyed by ctx value. + streams map[string]*watchGrpcStream +} + +type watchGrpcStream struct { + owner *watcher + remote pb.WatchClient + // ctx controls internal remote.Watch requests - ctx context.Context + ctx context.Context + // ctxKey is the key used when looking up this stream's context + ctxKey string cancel context.CancelFunc - // streams holds all active watchers - streams map[int64]*watcherStream // mu protects the streams map mu sync.RWMutex + // streams holds all active watchers + streams map[int64]*watcherStream // reqc sends a watch request from Watch() to the main goroutine reqc chan *watchRequest @@ -105,8 +124,11 @@ type watcher struct { stopc chan struct{} // donec closes to broadcast shutdown donec chan struct{} - // errc transmits errors from grpc Recv + // errc transmits errors from grpc Recv to the watch stream reconn logic errc chan error + + // the error that closed the watch stream + closeErr error } // watchRequest is issued by the subscriber to start a new watcher @@ -123,6 +145,7 @@ type watchRequest struct { // watcherStream represents a registered watcher type watcherStream struct { + // initReq is the request that initiated this request initReq watchRequest // outc publishes watch responses to subscriber @@ -138,10 +161,30 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { - ctx, cancel := context.WithCancel(context.Background()) - w := &watcher{ + return &watcher{ remote: pb.NewWatchClient(c.conn), + streams: make(map[string]*watchGrpcStream), + } +} + +// never closes +var valCtxCh = make(chan struct{}) +var zeroTime = time.Unix(0, 0) + +// ctx with only the values; never Done +type valCtx struct{ context.Context } + +func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } +func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } +func (vc *valCtx) Err() error { return nil } + +func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { + ctx, cancel := context.WithCancel(&valCtx{inctx}) + wgs := &watchGrpcStream{ + owner: w, + remote: w.remote, ctx: ctx, + ctxKey: fmt.Sprintf("%v", inctx), cancel: cancel, streams: make(map[int64]*watcherStream), @@ -151,8 +194,8 @@ func NewWatcher(c *Client) Watcher { donec: make(chan struct{}), errc: make(chan error, 1), } - go w.run() - return w + go wgs.run() + return wgs } // Watch posts a watch request to run() and waits for a new watcher channel @@ -170,13 +213,41 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch } ok := false + ctxKey := fmt.Sprintf("%v", ctx) + + // find or allocate appropriate grpc watch stream + w.mu.Lock() + if w.streams == nil { + // closed + w.mu.Unlock() + ch := make(chan WatchResponse) + close(ch) + return ch + } + wgs := w.streams[ctxKey] + if wgs == nil { + wgs = w.newWatcherGrpcStream(ctx) + w.streams[ctxKey] = wgs + } + donec := wgs.donec + reqc := wgs.reqc + w.mu.Unlock() + + // couldn't create channel; return closed channel + closeCh := make(chan WatchResponse, 1) // submit request select { - case w.reqc <- wr: + case reqc <- wr: ok = true case <-wr.ctx.Done(): - case <-w.donec: + case <-donec: + if wgs.closeErr != nil { + closeCh <- WatchResponse{closeErr: wgs.closeErr} + break + } + // retry; may have dropped stream from no ctxs + return w.Watch(ctx, key, opts...) } // receive channel @@ -185,23 +256,44 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case ret := <-retc: return ret case <-ctx.Done(): - case <-w.donec: + case <-donec: + if wgs.closeErr != nil { + closeCh <- WatchResponse{closeErr: wgs.closeErr} + break + } + // retry; may have dropped stream from no ctxs + return w.Watch(ctx, key, opts...) } } - // couldn't create channel; return closed channel - ch := make(chan WatchResponse) - close(ch) - return ch + close(closeCh) + return closeCh } -func (w *watcher) Close() error { +func (w *watcher) Close() (err error) { + w.mu.Lock() + streams := w.streams + w.streams = nil + w.mu.Unlock() + for _, wgs := range streams { + if werr := wgs.Close(); werr != nil { + err = werr + } + } + return err +} + +func (w *watchGrpcStream) Close() (err error) { close(w.stopc) <-w.donec - return toErr(w.ctx, <-w.errc) + select { + case err = <-w.errc: + default: + } + return toErr(w.ctx, err) } -func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { +func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { if pendingReq == nil { // no pending request; ignore return @@ -254,27 +346,27 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { } // closeStream closes the watcher resources and removes it -func (w *watcher) closeStream(ws *watcherStream) { +func (w *watchGrpcStream) closeStream(ws *watcherStream) { // cancels request stream; subscriber receives nil channel close(ws.initReq.retc) // close subscriber's channel close(ws.outc) - // shutdown serveStream - close(ws.recvc) delete(w.streams, ws.id) } // run is the root of the goroutines for managing a watcher client -func (w *watcher) run() { +func (w *watchGrpcStream) run() { var wc pb.Watch_WatchClient var closeErr error defer func() { - select { - case w.errc <- closeErr: - default: + w.owner.mu.Lock() + w.closeErr = closeErr + if w.owner.streams != nil { + delete(w.owner.streams, w.ctxKey) } close(w.donec) + w.owner.mu.Unlock() w.cancel() }() @@ -308,6 +400,18 @@ func (w *watcher) run() { curReqC = w.reqc case pbresp.Canceled: delete(cancelSet, pbresp.WatchId) + // shutdown serveStream, if any + w.mu.Lock() + if ws, ok := w.streams[pbresp.WatchId]; ok { + close(ws.recvc) + delete(w.streams, ws.id) + } + numStreams := len(w.streams) + w.mu.Unlock() + if numStreams == 0 { + // don't leak watcher streams + return + } default: // dispatch to appropriate watch stream if ok := w.dispatchEvent(pbresp); ok { @@ -328,7 +432,11 @@ func (w *watcher) run() { } // watch client failed to recv; spawn another if possible // TODO report watch client errors from errc? - case <-w.errc: + case err := <-w.errc: + if toErr(w.ctx, err) == v3rpc.ErrNoLeader { + closeErr = err + return + } if wc, closeErr = w.newWatchClient(); closeErr != nil { return } @@ -357,7 +465,7 @@ func (w *watcher) run() { } // dispatchEvent sends a WatchResponse to the appropriate watcher stream -func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { +func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { w.mu.RLock() defer w.mu.RUnlock() ws, ok := w.streams[pbresp.WatchId] @@ -377,7 +485,7 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { } // serveWatchClient forwards messages from the grpc stream to run() -func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) { +func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { for { resp, err := wc.Recv() if err != nil { @@ -396,7 +504,7 @@ func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) { } // serveStream forwards watch responses from run() to the subscriber -func (w *watcher) serveStream(ws *watcherStream) { +func (w *watchGrpcStream) serveStream(ws *watcherStream) { emptyWr := &WatchResponse{} wrs := []*WatchResponse{} resuming := false @@ -465,13 +573,23 @@ func (w *watcher) serveStream(ws *watcherStream) { closing = true } } + + // try to send off close error + if w.closeErr != nil { + select { + case ws.outc <- WatchResponse{closeErr: w.closeErr}: + case <-w.donec: + case <-time.After(closeSendErrTimeout): + } + } + w.mu.Lock() w.closeStream(ws) w.mu.Unlock() // lazily send cancel message if events on missing id } -func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) { +func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { ws, rerr := w.resume() if rerr != nil { return nil, rerr @@ -481,7 +599,7 @@ func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) { } // resume creates a new WatchClient with all current watchers reestablished -func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { +func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { for { if ws, err = w.openWatchClient(); err != nil { break @@ -493,7 +611,7 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { } // openWatchClient retries opening a watchclient until retryConnection fails -func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { +func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { select { case <-w.stopc: @@ -514,7 +632,7 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { } // resumeWatchers rebuilds every registered watcher on a new client -func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { +func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error { w.mu.RLock() streams := make([]*watcherStream, 0, len(w.streams)) for _, ws := range w.streams {