From ec459c218539391dddfe97a35f190f6cfd2f7eab Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 2 Nov 2016 16:57:57 -0700 Subject: [PATCH] grpcproxy: rework watcher organization The single watcher / group watcher distinction limited and complicated watcher coalescing more than necessary. Reworked: Each server watcher is represented by a WatchBroadcast, each client "Watcher" attaches to some WatchBroadcast. WatchBroadcasts hold all WatchBroadcast instances for a range. WatchRanges holds all WatchBroadcasts for the proxy. WatchProxyStreams represent a grpc watch stream between the proxy and a client. When a client requests a new watcher through its grpc stream, the ProxyStream will allocate a Watcher and WatchRanges assigns it to some WatchBroadcast based on its range. Coalescing is done by WatchBroadcasts when it receives an update notification from a WatchBroadcast. Supports leader failure detection so watches on a bad member can migrate to other members. Coincidentally, Fixes #6303. --- proxy/grpcproxy/watch.go | 329 ++++++++++++------------ proxy/grpcproxy/watch_broadcast.go | 135 ++++++++++ proxy/grpcproxy/watch_broadcasts.go | 132 ++++++++++ proxy/grpcproxy/watch_client_adapter.go | 38 ++- proxy/grpcproxy/watch_ranges.go | 70 +++++ proxy/grpcproxy/watcher.go | 54 ++-- proxy/grpcproxy/watcher_group.go | 106 -------- proxy/grpcproxy/watcher_group_test.go | 50 ---- proxy/grpcproxy/watcher_groups.go | 128 --------- proxy/grpcproxy/watcher_single.go | 73 ------ 10 files changed, 566 insertions(+), 549 deletions(-) create mode 100644 proxy/grpcproxy/watch_broadcast.go create mode 100644 proxy/grpcproxy/watch_broadcasts.go create mode 100644 proxy/grpcproxy/watch_ranges.go delete mode 100644 proxy/grpcproxy/watcher_group.go delete mode 100644 proxy/grpcproxy/watcher_group_test.go delete mode 100644 proxy/grpcproxy/watcher_groups.go delete mode 100644 proxy/grpcproxy/watcher_single.go diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index db3c3fc37..3a1c26b33 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -15,253 +15,246 @@ package grpcproxy import ( - "io" "sync" "golang.org/x/net/context" + "golang.org/x/time/rate" + "google.golang.org/grpc/metadata" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) type watchProxy struct { cw clientv3.Watcher - wgs watchergroups - - mu sync.Mutex - nextStreamID int64 - ctx context.Context + + ranges *watchRanges + + // retryLimiter controls the create watch retry rate on lost leaders. + retryLimiter *rate.Limiter + + // mu protects leaderc updates. + mu sync.RWMutex + leaderc chan struct{} + + // wg waits until all outstanding watch servers quit. + wg sync.WaitGroup } +const ( + lostLeaderKey = "__lostleader" // watched to detect leader l oss + retryPerSecond = 10 +) + func NewWatchProxy(c *clientv3.Client) pb.WatchServer { wp := &watchProxy{ - cw: c.Watcher, - wgs: watchergroups{ - cw: c.Watcher, - groups: make(map[watchRange]*watcherGroup), - idToGroup: make(map[receiverID]*watcherGroup), - proxyCtx: c.Ctx(), - }, - ctx: c.Ctx(), + cw: c.Watcher, + ctx: clientv3.WithRequireLeader(c.Ctx()), + retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond), + leaderc: make(chan struct{}), } + wp.ranges = newWatchRanges(wp) go func() { + // a new streams without opening any watchers won't catch + // a lost leader event, so have a special watch to monitor it + rev := int64((uint64(1) << 63) - 2) + for wp.ctx.Err() == nil { + wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev)) + for range wch { + } + wp.mu.Lock() + close(wp.leaderc) + wp.leaderc = make(chan struct{}) + wp.mu.Unlock() + wp.retryLimiter.Wait(wp.ctx) + } + wp.mu.Lock() <-wp.ctx.Done() - wp.wgs.stop() + wp.mu.Unlock() + wp.wg.Wait() + wp.ranges.stop() }() return wp } func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Lock() - wp.nextStreamID++ - sid := wp.nextStreamID + select { + case <-wp.ctx.Done(): + wp.mu.Unlock() + return + default: + wp.wg.Add(1) + } wp.mu.Unlock() - ctx, cancel := context.WithCancel(wp.ctx) - sws := serverWatchStream{ - cw: wp.cw, - groups: &wp.wgs, - singles: make(map[int64]*watcherSingle), - inGroups: make(map[int64]struct{}), - - id: sid, - gRPCStream: stream, - - watchCh: make(chan *pb.WatchResponse, 1024), - - ctx: ctx, - cancel: cancel, + ctx, cancel := context.WithCancel(stream.Context()) + wps := &watchProxyStream{ + ranges: wp.ranges, + watchers: make(map[int64]*watcher), + stream: stream, + watchCh: make(chan *pb.WatchResponse, 1024), + ctx: ctx, + cancel: cancel, } - go sws.recvLoop() - sws.sendLoop() - return wp.ctx.Err() + var leaderc <-chan struct{} + if md, ok := metadata.FromContext(stream.Context()); ok { + v := md[rpctypes.MetadataRequireLeaderKey] + if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { + leaderc = wp.lostLeaderNotify() + } + } + + // post to stopc => terminate server stream; can't use a waitgroup + // since all goroutines will only terminate after Watch() exits. + stopc := make(chan struct{}, 3) + go func() { + defer func() { stopc <- struct{}{} }() + wps.recvLoop() + }() + go func() { + defer func() { stopc <- struct{}{} }() + wps.sendLoop() + }() + if leaderc != nil { + go func() { + defer func() { stopc <- struct{}{} }() + select { + case <-leaderc: + case <-ctx.Done(): + } + }() + } + + <-stopc + // recv/send may only shutdown after function exits; + // goroutine notifies proxy that stream is through + go func() { + if leaderc != nil { + <-stopc + } + <-stopc + wps.close() + wp.wg.Done() + }() + + select { + case <-leaderc: + return rpctypes.ErrNoLeader + default: + return wps.ctx.Err() + } } -type serverWatchStream struct { - id int64 - cw clientv3.Watcher +func (wp *watchProxy) lostLeaderNotify() <-chan struct{} { + wp.mu.RLock() + defer wp.mu.RUnlock() + return wp.leaderc +} - mu sync.Mutex // make sure any access of groups and singles is atomic - groups *watchergroups - singles map[int64]*watcherSingle - inGroups map[int64]struct{} - - gRPCStream pb.Watch_WatchServer - - watchCh chan *pb.WatchResponse +// watchProxyStream forwards etcd watch events to a proxied client stream. +type watchProxyStream struct { + ranges *watchRanges + // mu protects watchers and nextWatcherID + mu sync.Mutex + // watchers receive events from watch broadcast. + watchers map[int64]*watcher + // nextWatcherID is the id to assign the next watcher on this stream. nextWatcherID int64 + stream pb.Watch_WatchServer + + // watchCh receives watch responses from the watchers. + watchCh chan *pb.WatchResponse + ctx context.Context cancel context.CancelFunc } -func (sws *serverWatchStream) close() { +func (wps *watchProxyStream) close() { var wg sync.WaitGroup - sws.cancel() - sws.mu.Lock() - wg.Add(len(sws.singles) + len(sws.inGroups)) - for _, ws := range sws.singles { - // copy the range variable to avoid race - copyws := ws - go func() { - copyws.stop() + wps.cancel() + wps.mu.Lock() + wg.Add(len(wps.watchers)) + for _, wpsw := range wps.watchers { + go func(w *watcher) { + wps.ranges.delete(w) wg.Done() - }() + }(wpsw) } - for id := range sws.inGroups { - // copy the range variable to avoid race - wid := id - go func() { - sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid}) - wg.Done() - }() - } - sws.inGroups = nil - sws.mu.Unlock() + wps.watchers = nil + wps.mu.Unlock() wg.Wait() - close(sws.watchCh) + close(wps.watchCh) } -func (sws *serverWatchStream) recvLoop() error { - defer sws.close() - +func (wps *watchProxyStream) recvLoop() error { for { - req, err := sws.gRPCStream.Recv() - if err == io.EOF { - return nil - } + req, err := wps.stream.Recv() if err != nil { return err } - switch uv := req.RequestUnion.(type) { case *pb.WatchRequest_CreateRequest: cr := uv.CreateRequest + w := &watcher{ + wr: watchRange{string(cr.Key), string(cr.RangeEnd)}, + id: wps.nextWatcherID, + wps: wps, - watcher := watcher{ - wr: watchRange{ - key: string(cr.Key), - end: string(cr.RangeEnd), - }, - id: sws.nextWatcherID, - sws: sws, - + nextrev: cr.StartRevision, progress: cr.ProgressNotify, filters: v3rpc.FiltersFromRequest(cr), } - if cr.StartRevision != 0 { - sws.addDedicatedWatcher(watcher, cr.StartRevision) - } else { - sws.addCoalescedWatcher(watcher) - } - sws.nextWatcherID++ - + wps.nextWatcherID++ + w.nextrev = cr.StartRevision + wps.watchers[w.id] = w + wps.ranges.add(w) case *pb.WatchRequest_CancelRequest: - sws.removeWatcher(uv.CancelRequest.WatchId) + wps.delete(uv.CancelRequest.WatchId) default: panic("not implemented") } } } -func (sws *serverWatchStream) sendLoop() { +func (wps *watchProxyStream) sendLoop() { for { select { - case wresp, ok := <-sws.watchCh: + case wresp, ok := <-wps.watchCh: if !ok { return } - if err := sws.gRPCStream.Send(wresp); err != nil { + if err := wps.stream.Send(wresp); err != nil { return } - case <-sws.ctx.Done(): + case <-wps.ctx.Done(): return } } } -func (sws *serverWatchStream) addCoalescedWatcher(w watcher) { - sws.mu.Lock() - defer sws.mu.Unlock() +func (wps *watchProxyStream) delete(id int64) { + wps.mu.Lock() + defer wps.mu.Unlock() - rid := receiverID{streamID: sws.id, watcherID: w.id} - sws.groups.addWatcher(rid, w) - sws.inGroups[w.id] = struct{}{} -} - -func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) { - ctx, cancel := context.WithCancel(sws.ctx) - wch := sws.cw.Watch(ctx, - w.wr.key, clientv3.WithRange(w.wr.end), - clientv3.WithRev(rev), - clientv3.WithProgressNotify(), - clientv3.WithCreatedNotify(), - ) - sws.mu.Lock() - defer sws.mu.Unlock() - ws := newWatcherSingle(wch, cancel, w, sws) - sws.singles[w.id] = ws - go ws.run() -} - -func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool { - sws.mu.Lock() - defer sws.mu.Unlock() - - // do not add new watchers when stream is closing - if sws.inGroups == nil { - return false - } - if sws.groups.maybeJoinWatcherSingle(ws) { - delete(sws.singles, ws.w.id) - sws.inGroups[ws.w.id] = struct{}{} - return true - } - return false -} - -func (sws *serverWatchStream) removeWatcher(id int64) { - sws.mu.Lock() - defer sws.mu.Unlock() - - var ( - rev int64 - ok bool - ) - - defer func() { - if !ok { - return - } - resp := &pb.WatchResponse{ - Header: &pb.ResponseHeader{ - // todo: fill in ClusterId - // todo: fill in MemberId: - Revision: rev, - // todo: fill in RaftTerm: - }, - WatchId: id, - Canceled: true, - } - sws.watchCh <- resp - }() - - rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) - if ok { - delete(sws.inGroups, id) + w, ok := wps.watchers[id] + if !ok { return } - - var ws *watcherSingle - if ws, ok = sws.singles[id]; ok { - delete(sws.singles, id) - ws.stop() - rev = ws.lastStoreRev + wps.ranges.delete(w) + delete(wps.watchers, id) + resp := &pb.WatchResponse{ + Header: &w.lastHeader, + WatchId: id, + Canceled: true, } + wps.watchCh <- resp } diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go new file mode 100644 index 000000000..c22bf08b9 --- /dev/null +++ b/proxy/grpcproxy/watch_broadcast.go @@ -0,0 +1,135 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "sync" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// watchBroadcast broadcasts a server watcher to many client watchers. +type watchBroadcast struct { + // wbs is the backpointer to all broadcasts on this range + wbs *watchBroadcasts + // cancel stops the underlying etcd server watcher and closes ch. + cancel context.CancelFunc + donec chan struct{} + + // mu protects rev and receivers. + mu sync.RWMutex + // nextrev is the minimum expected next revision of the watcher on ch. + nextrev int64 + // receivers contains all the client-side watchers to serve. + receivers map[*watcher]struct{} + // responses counts the number of responses + responses int +} + +func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { + cctx, cancel := context.WithCancel(wp.ctx) + wb := &watchBroadcast{ + cancel: cancel, + nextrev: w.nextrev, + receivers: make(map[*watcher]struct{}), + donec: make(chan struct{}), + } + wb.add(w) + go func() { + defer close(wb.donec) + // loop because leader loss will close channel + for cctx.Err() == nil { + wch := wp.cw.Watch(cctx, w.wr.key, + clientv3.WithRange(w.wr.end), + clientv3.WithProgressNotify(), + clientv3.WithCreatedNotify(), + clientv3.WithRev(wb.nextrev), + ) + for wr := range wch { + wb.bcast(wr) + update(wb) + } + wp.retryLimiter.Wait(cctx) + } + }() + return wb +} + +func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) { + wb.mu.Lock() + defer wb.mu.Unlock() + wb.nextrev = wr.Header.Revision + 1 + wb.responses++ + for r := range wb.receivers { + r.send(wr) + } +} + +// add puts a watcher into receiving a broadcast if its revision at least +// meets the broadcast revision. Returns true if added. +func (wb *watchBroadcast) add(w *watcher) bool { + wb.mu.Lock() + defer wb.mu.Unlock() + if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) { + // wb is too far ahead, w will miss events + // or wb is being established with a current watcher + return false + } + if wb.responses == 0 { + // Newly created; create event will be sent by etcd. + wb.receivers[w] = struct{}{} + return true + } + // already sent by etcd; emulate create event + ok := w.post(&pb.WatchResponse{ + Header: &pb.ResponseHeader{ + // todo: fill in ClusterId + // todo: fill in MemberId: + Revision: w.nextrev, + // todo: fill in RaftTerm: + }, + WatchId: w.id, + Created: true, + }) + if !ok { + return false + } + wb.receivers[w] = struct{}{} + return true +} +func (wb *watchBroadcast) delete(w *watcher) { + wb.mu.Lock() + defer wb.mu.Unlock() + if _, ok := wb.receivers[w]; !ok { + panic("deleting missing watcher from broadcast") + } + delete(wb.receivers, w) +} + +func (wb *watchBroadcast) size() int { + wb.mu.RLock() + defer wb.mu.RUnlock() + return len(wb.receivers) +} + +func (wb *watchBroadcast) empty() bool { return wb.size() == 0 } + +func (wb *watchBroadcast) stop() { + wb.cancel() + <-wb.donec +} diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go new file mode 100644 index 000000000..38421a448 --- /dev/null +++ b/proxy/grpcproxy/watch_broadcasts.go @@ -0,0 +1,132 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "sync" +) + +type watchBroadcasts struct { + wp *watchProxy + + // mu protects bcasts and watchers from the coalesce loop. + mu sync.Mutex + bcasts map[*watchBroadcast]struct{} + watchers map[*watcher]*watchBroadcast + + updatec chan *watchBroadcast + donec chan struct{} +} + +// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced. +const maxCoalesceReceivers = 5 + +func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts { + wbs := &watchBroadcasts{ + wp: wp, + bcasts: make(map[*watchBroadcast]struct{}), + watchers: make(map[*watcher]*watchBroadcast), + updatec: make(chan *watchBroadcast, 1), + donec: make(chan struct{}), + } + go func() { + defer close(wbs.donec) + for wb := range wbs.updatec { + wbs.coalesce(wb) + } + }() + return wbs +} + +func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) { + if wb.size() >= maxCoalesceReceivers { + return + } + wbs.mu.Lock() + for wbswb := range wbs.bcasts { + if wbswb == wb { + continue + } + wbswb.mu.Lock() + // NB: victim lock already held + if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 { + for w := range wb.receivers { + wbswb.receivers[w] = struct{}{} + wbs.watchers[w] = wbswb + } + wb.receivers = nil + } + wbswb.mu.Unlock() + if wb.empty() { + delete(wbs.bcasts, wb) + wb.stop() + break + } + } + wbs.mu.Unlock() +} + +func (wbs *watchBroadcasts) add(w *watcher) { + wbs.mu.Lock() + defer wbs.mu.Unlock() + // find fitting bcast + for wb := range wbs.bcasts { + if wb.add(w) { + wbs.watchers[w] = wb + return + } + } + // no fit; create a bcast + wb := newWatchBroadcast(wbs.wp, w, wbs.update) + wbs.watchers[w] = wb + wbs.bcasts[wb] = struct{}{} +} + +func (wbs *watchBroadcasts) delete(w *watcher) { + wbs.mu.Lock() + defer wbs.mu.Unlock() + + wb, ok := wbs.watchers[w] + if !ok { + panic("deleting missing watcher from broadcasts") + } + delete(wbs.watchers, w) + wb.delete(w) + if wb.empty() { + delete(wbs.bcasts, wb) + wb.stop() + } +} + +func (wbs *watchBroadcasts) empty() bool { return len(wbs.bcasts) == 0 } + +func (wbs *watchBroadcasts) stop() { + wbs.mu.Lock() + defer wbs.mu.Unlock() + + for wb := range wbs.bcasts { + wb.stop() + } + wbs.bcasts = nil + close(wbs.updatec) + <-wbs.donec +} + +func (wbs *watchBroadcasts) update(wb *watchBroadcast) { + select { + case wbs.updatec <- wb: + default: + } +} diff --git a/proxy/grpcproxy/watch_client_adapter.go b/proxy/grpcproxy/watch_client_adapter.go index d6c9bdb46..283c2ed07 100644 --- a/proxy/grpcproxy/watch_client_adapter.go +++ b/proxy/grpcproxy/watch_client_adapter.go @@ -32,14 +32,27 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient { } func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) { - ch1, ch2 := make(chan interface{}), make(chan interface{}) + // ch1 is buffered so server can send error on close + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}) headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) - wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}} - wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}, nil}} + + cctx, ccancel := context.WithCancel(ctx) + cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel} + wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}} + + sctx, scancel := context.WithCancel(ctx) + srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel} + wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}} go func() { - s.wserv.Watch(wserver) - // close the server side sender - close(ch1) + if err := s.wserv.Watch(wserver); err != nil { + select { + case srv.sendc <- err: + case <-sctx.Done(): + case <-cctx.Done(): + } + } + scancel() + ccancel() }() return wclient, nil } @@ -145,9 +158,10 @@ func (s *chanClientStream) CloseSend() error { // chanStream implements grpc.Stream using channels type chanStream struct { - recvc <-chan interface{} - sendc chan<- interface{} - ctx context.Context + recvc <-chan interface{} + sendc chan<- interface{} + ctx context.Context + cancel context.CancelFunc } func (s *chanStream) Context() context.Context { return s.ctx } @@ -155,6 +169,9 @@ func (s *chanStream) Context() context.Context { return s.ctx } func (s *chanStream) SendMsg(m interface{}) error { select { case s.sendc <- m: + if err, ok := m.(error); ok { + return err + } return nil case <-s.ctx.Done(): } @@ -168,6 +185,9 @@ func (s *chanStream) RecvMsg(m interface{}) error { if !ok { return grpc.ErrClientConnClosing } + if err, ok := msg.(error); ok { + return err + } *v = msg return nil case <-s.ctx.Done(): diff --git a/proxy/grpcproxy/watch_ranges.go b/proxy/grpcproxy/watch_ranges.go new file mode 100644 index 000000000..27464453a --- /dev/null +++ b/proxy/grpcproxy/watch_ranges.go @@ -0,0 +1,70 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "sync" +) + +// watchRanges tracks all open watches for the proxy. +type watchRanges struct { + wp *watchProxy + + mu sync.Mutex + bcasts map[watchRange]*watchBroadcasts +} + +func newWatchRanges(wp *watchProxy) *watchRanges { + return &watchRanges{ + wp: wp, + bcasts: make(map[watchRange]*watchBroadcasts), + } +} + +func (wrs *watchRanges) add(w *watcher) { + wrs.mu.Lock() + defer wrs.mu.Unlock() + + if wbs := wrs.bcasts[w.wr]; wbs != nil { + wbs.add(w) + return + } + wbs := newWatchBroadcasts(wrs.wp) + wrs.bcasts[w.wr] = wbs + wbs.add(w) +} + +func (wrs *watchRanges) delete(w *watcher) { + wrs.mu.Lock() + defer wrs.mu.Unlock() + wbs, ok := wrs.bcasts[w.wr] + if !ok { + panic("deleting missing range") + } + wbs.delete(w) + if wbs.empty() { + wbs.stop() + delete(wrs.bcasts, w.wr) + } +} + +func (wrs *watchRanges) stop() { + wrs.mu.Lock() + defer wrs.mu.Unlock() + for _, wb := range wrs.bcasts { + wb.stop() + } + wrs.bcasts = nil +} diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 2d25433d9..761daa9f4 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -28,31 +28,48 @@ type watchRange struct { } type watcher struct { - id int64 - wr watchRange - sws *serverWatchStream + // user configuration - rev int64 + wr watchRange filters []mvcc.FilterFunc progress bool + + // id is the id returned to the client on its watch stream. + id int64 + // nextrev is the minimum expected next event revision. + nextrev int64 + // lastHeader has the last header sent over the stream. + lastHeader pb.ResponseHeader + + // wps is the parent. + wps *watchProxyStream } +// send filters out repeated events by discarding revisions older +// than the last one sent over the watch channel. func (w *watcher) send(wr clientv3.WatchResponse) { if wr.IsProgressNotify() && !w.progress { return } + if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 { + return + } + if w.nextrev == 0 { + // current watch; expect updates following this revision + w.nextrev = wr.Header.Revision + 1 + } events := make([]*mvccpb.Event, 0, len(wr.Events)) var lastRev int64 for i := range wr.Events { ev := (*mvccpb.Event)(wr.Events[i]) - if ev.Kv.ModRevision <= w.rev { + if ev.Kv.ModRevision < w.nextrev { continue } else { // We cannot update w.rev here. // txn can have multiple events with the same rev. - // If we update w.rev here, we would skip some events in the same txn. + // If w.nextrev updates here, it would skip events in the same txn. lastRev = ev.Kv.ModRevision } @@ -71,8 +88,8 @@ func (w *watcher) send(wr clientv3.WatchResponse) { } } - if lastRev > w.rev { - w.rev = lastRev + if lastRev >= w.nextrev { + w.nextrev = lastRev + 1 } // all events are filtered out? @@ -80,15 +97,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) { return } - pbwr := &pb.WatchResponse{ + w.lastHeader = wr.Header + w.post(&pb.WatchResponse{ Header: &wr.Header, Created: wr.Created, WatchId: w.id, Events: events, - } - select { - case w.sws.watchCh <- pbwr: - case <-time.After(50 * time.Millisecond): - w.sws.cancel() - } + }) +} + +// post puts a watch response on the watcher's proxy stream channel +func (w *watcher) post(wr *pb.WatchResponse) bool { + select { + case w.wps.watchCh <- wr: + case <-time.After(50 * time.Millisecond): + w.wps.cancel() + return false + } + return true } diff --git a/proxy/grpcproxy/watcher_group.go b/proxy/grpcproxy/watcher_group.go deleted file mode 100644 index 4bce9ca46..000000000 --- a/proxy/grpcproxy/watcher_group.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "sync" - - "golang.org/x/net/context" - - "github.com/coreos/etcd/clientv3" -) - -type watcherGroup struct { - // ch delievers events received from the etcd server - ch clientv3.WatchChan - // cancel is used to cancel the underlying etcd server watcher - // It should also close the ch. - cancel context.CancelFunc - - mu sync.Mutex - rev int64 // current revision of the watchergroup - receivers map[receiverID]watcher - - donec chan struct{} -} - -type receiverID struct { - streamID, watcherID int64 -} - -func newWatchergroup(wch clientv3.WatchChan, c context.CancelFunc) *watcherGroup { - return &watcherGroup{ - ch: wch, - cancel: c, - - receivers: make(map[receiverID]watcher), - donec: make(chan struct{}), - } -} - -func (wg *watcherGroup) run() { - defer close(wg.donec) - for wr := range wg.ch { - wg.broadcast(wr) - } -} - -func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) { - wg.mu.Lock() - defer wg.mu.Unlock() - - wg.rev = wr.Header.Revision - for _, r := range wg.receivers { - r.send(wr) - } -} - -// add adds the watcher into the group with given ID. -// The current revision of the watcherGroup is returned or -1 -// if the watcher is at a revision prior to the watcher group. -func (wg *watcherGroup) add(rid receiverID, w watcher) int64 { - wg.mu.Lock() - defer wg.mu.Unlock() - if wg.rev > w.rev { - return -1 - } - wg.receivers[rid] = w - return wg.rev -} - -func (wg *watcherGroup) delete(rid receiverID) { - wg.mu.Lock() - defer wg.mu.Unlock() - - delete(wg.receivers, rid) -} - -func (wg *watcherGroup) isEmpty() bool { - wg.mu.Lock() - defer wg.mu.Unlock() - - return len(wg.receivers) == 0 -} - -func (wg *watcherGroup) stop() { - wg.cancel() - <-wg.donec -} - -func (wg *watcherGroup) revision() int64 { - wg.mu.Lock() - defer wg.mu.Unlock() - return wg.rev -} diff --git a/proxy/grpcproxy/watcher_group_test.go b/proxy/grpcproxy/watcher_group_test.go deleted file mode 100644 index 3436bbb38..000000000 --- a/proxy/grpcproxy/watcher_group_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "testing" - - "golang.org/x/net/context" - - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" -) - -func TestWatchgroupBroadcast(t *testing.T) { - wch := make(chan clientv3.WatchResponse, 0) - wg := newWatchergroup(wch, nil) - go wg.run() - - chs := make([]chan *pb.WatchResponse, 10) - for i := range chs { - chs[i] = make(chan *pb.WatchResponse, 1) - w := watcher{ - id: int64(i), - sws: &serverWatchStream{watchCh: chs[i], ctx: context.TODO()}, - - progress: true, - } - rid := receiverID{streamID: 1, watcherID: w.id} - wg.add(rid, w) - } - - // send a progress response - wch <- clientv3.WatchResponse{Header: pb.ResponseHeader{Revision: 1}} - - for _, ch := range chs { - <-ch - } -} diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go deleted file mode 100644 index a81e6a2a1..000000000 --- a/proxy/grpcproxy/watcher_groups.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "sync" - - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "golang.org/x/net/context" -) - -type watchergroups struct { - cw clientv3.Watcher - - mu sync.Mutex - groups map[watchRange]*watcherGroup - idToGroup map[receiverID]*watcherGroup - - proxyCtx context.Context -} - -func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { - wgs.mu.Lock() - defer wgs.mu.Unlock() - - groups := wgs.groups - - if wg, ok := groups[w.wr]; ok { - rev := wg.add(rid, w) - wgs.idToGroup[rid] = wg - - if rev == 0 { - // The group is newly created, the create event has not been delivered - // to this group yet. - // We can rely on etcd server to deliver the create event. - // Or we might end up sending created event twice. - return - } - - resp := &pb.WatchResponse{ - Header: &pb.ResponseHeader{ - // todo: fill in ClusterId - // todo: fill in MemberId: - Revision: rev, - // todo: fill in RaftTerm: - }, - WatchId: rid.watcherID, - Created: true, - } - select { - case w.sws.watchCh <- resp: - case <-w.sws.ctx.Done(): - } - return - } - - ctx, cancel := context.WithCancel(wgs.proxyCtx) - - wch := wgs.cw.Watch(ctx, w.wr.key, - clientv3.WithRange(w.wr.end), - clientv3.WithProgressNotify(), - clientv3.WithCreatedNotify(), - ) - - watchg := newWatchergroup(wch, cancel) - watchg.add(rid, w) - go watchg.run() - groups[w.wr] = watchg - wgs.idToGroup[rid] = watchg -} - -func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) { - wgs.mu.Lock() - defer wgs.mu.Unlock() - - if g, ok := wgs.idToGroup[rid]; ok { - g.delete(rid) - delete(wgs.idToGroup, rid) - if g.isEmpty() { - g.stop() - } - return g.revision(), true - } - return -1, false -} - -func (wgs *watchergroups) maybeJoinWatcherSingle(ws watcherSingle) bool { - wgs.mu.Lock() - defer wgs.mu.Unlock() - - rid := receiverID{streamID: ws.sws.id, watcherID: ws.w.id} - group, ok := wgs.groups[ws.w.wr] - if ok { - return group.add(rid, ws.w) != -1 - } - if !ws.canPromote() { - return false - } - wg := newWatchergroup(ws.ch, ws.cancel) - wgs.groups[ws.w.wr] = wg - wgs.idToGroup[rid] = wg - wg.add(rid, ws.w) - go wg.run() - return true -} - -func (wgs *watchergroups) stop() { - wgs.mu.Lock() - defer wgs.mu.Unlock() - for _, wg := range wgs.groups { - wg.stop() - } - wgs.groups = nil -} diff --git a/proxy/grpcproxy/watcher_single.go b/proxy/grpcproxy/watcher_single.go deleted file mode 100644 index 99df1c0f1..000000000 --- a/proxy/grpcproxy/watcher_single.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "github.com/coreos/etcd/clientv3" - "golang.org/x/net/context" -) - -type watcherSingle struct { - // ch delievers events received from the etcd server - ch clientv3.WatchChan - // cancel is used to cancel the underlying etcd server watcher - // It should also close the ch. - cancel context.CancelFunc - - // sws is the stream this watcherSingle attached to - sws *serverWatchStream - - w watcher - - lastStoreRev int64 // last seen revision of the remote mvcc store - - donec chan struct{} -} - -func newWatcherSingle(wch clientv3.WatchChan, c context.CancelFunc, w watcher, sws *serverWatchStream) *watcherSingle { - return &watcherSingle{ - sws: sws, - - ch: wch, - cancel: c, - - w: w, - donec: make(chan struct{}), - } -} - -func (ws watcherSingle) run() { - defer close(ws.donec) - - for wr := range ws.ch { - ws.lastStoreRev = wr.Header.Revision - ws.w.send(wr) - if ws.sws.maybeCoalesceWatcher(ws) { - return - } - } -} - -// canPromote returns true if a watcherSingle can promote itself to a watchergroup -// when it already caught up with the last seen revision from the response header -// of an etcd server. -func (ws watcherSingle) canPromote() bool { - return ws.w.rev == ws.lastStoreRev -} - -func (ws watcherSingle) stop() { - ws.cancel() - <-ws.donec -}