Merge pull request #6822 from heyitsanthony/watch-bcast

grpcproxy: rework watcher organization
This commit is contained in:
Anthony Romano 2016-11-11 10:50:27 -08:00 committed by GitHub
commit b4726a9501
10 changed files with 577 additions and 552 deletions

View File

@ -15,253 +15,250 @@
package grpcproxy
import (
"io"
"sync"
"golang.org/x/net/context"
"golang.org/x/time/rate"
"google.golang.org/grpc/metadata"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type watchProxy struct {
cw clientv3.Watcher
wgs watchergroups
mu sync.Mutex
nextStreamID int64
ctx context.Context
ranges *watchRanges
// retryLimiter controls the create watch retry rate on lost leaders.
retryLimiter *rate.Limiter
// mu protects leaderc updates.
mu sync.RWMutex
leaderc chan struct{}
// wg waits until all outstanding watch servers quit.
wg sync.WaitGroup
}
const (
lostLeaderKey = "__lostleader" // watched to detect leader l oss
retryPerSecond = 10
)
func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
wp := &watchProxy{
cw: c.Watcher,
wgs: watchergroups{
cw: c.Watcher,
groups: make(map[watchRange]*watcherGroup),
idToGroup: make(map[receiverID]*watcherGroup),
proxyCtx: c.Ctx(),
},
ctx: c.Ctx(),
cw: c.Watcher,
ctx: clientv3.WithRequireLeader(c.Ctx()),
retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
leaderc: make(chan struct{}),
}
wp.ranges = newWatchRanges(wp)
go func() {
// a new streams without opening any watchers won't catch
// a lost leader event, so have a special watch to monitor it
rev := int64((uint64(1) << 63) - 2)
for wp.ctx.Err() == nil {
wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev))
for range wch {
}
wp.mu.Lock()
close(wp.leaderc)
wp.leaderc = make(chan struct{})
wp.mu.Unlock()
wp.retryLimiter.Wait(wp.ctx)
}
wp.mu.Lock()
<-wp.ctx.Done()
wp.wgs.stop()
wp.mu.Unlock()
wp.wg.Wait()
wp.ranges.stop()
}()
return wp
}
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Lock()
wp.nextStreamID++
sid := wp.nextStreamID
select {
case <-wp.ctx.Done():
wp.mu.Unlock()
return
default:
wp.wg.Add(1)
}
wp.mu.Unlock()
ctx, cancel := context.WithCancel(wp.ctx)
sws := serverWatchStream{
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
inGroups: make(map[int64]struct{}),
id: sid,
gRPCStream: stream,
watchCh: make(chan *pb.WatchResponse, 1024),
ctx: ctx,
cancel: cancel,
ctx, cancel := context.WithCancel(stream.Context())
wps := &watchProxyStream{
ranges: wp.ranges,
watchers: make(map[int64]*watcher),
stream: stream,
watchCh: make(chan *pb.WatchResponse, 1024),
ctx: ctx,
cancel: cancel,
}
go sws.recvLoop()
sws.sendLoop()
return wp.ctx.Err()
var leaderc <-chan struct{}
if md, ok := metadata.FromContext(stream.Context()); ok {
v := md[rpctypes.MetadataRequireLeaderKey]
if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
leaderc = wp.lostLeaderNotify()
}
}
// post to stopc => terminate server stream; can't use a waitgroup
// since all goroutines will only terminate after Watch() exits.
stopc := make(chan struct{}, 3)
go func() {
defer func() { stopc <- struct{}{} }()
wps.recvLoop()
}()
go func() {
defer func() { stopc <- struct{}{} }()
wps.sendLoop()
}()
if leaderc != nil {
go func() {
defer func() { stopc <- struct{}{} }()
select {
case <-leaderc:
case <-ctx.Done():
}
}()
}
<-stopc
// recv/send may only shutdown after function exits;
// goroutine notifies proxy that stream is through
go func() {
if leaderc != nil {
<-stopc
}
<-stopc
wps.close()
wp.wg.Done()
}()
select {
case <-leaderc:
return rpctypes.ErrNoLeader
default:
return wps.ctx.Err()
}
}
type serverWatchStream struct {
id int64
cw clientv3.Watcher
func (wp *watchProxy) lostLeaderNotify() <-chan struct{} {
wp.mu.RLock()
defer wp.mu.RUnlock()
return wp.leaderc
}
mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
inGroups map[int64]struct{}
gRPCStream pb.Watch_WatchServer
watchCh chan *pb.WatchResponse
// watchProxyStream forwards etcd watch events to a proxied client stream.
type watchProxyStream struct {
ranges *watchRanges
// mu protects watchers and nextWatcherID
mu sync.Mutex
// watchers receive events from watch broadcast.
watchers map[int64]*watcher
// nextWatcherID is the id to assign the next watcher on this stream.
nextWatcherID int64
stream pb.Watch_WatchServer
// watchCh receives watch responses from the watchers.
watchCh chan *pb.WatchResponse
ctx context.Context
cancel context.CancelFunc
}
func (sws *serverWatchStream) close() {
func (wps *watchProxyStream) close() {
var wg sync.WaitGroup
sws.cancel()
sws.mu.Lock()
wg.Add(len(sws.singles) + len(sws.inGroups))
for _, ws := range sws.singles {
// copy the range variable to avoid race
copyws := ws
go func() {
copyws.stop()
wps.cancel()
wps.mu.Lock()
wg.Add(len(wps.watchers))
for _, wpsw := range wps.watchers {
go func(w *watcher) {
wps.ranges.delete(w)
wg.Done()
}()
}(wpsw)
}
for id := range sws.inGroups {
// copy the range variable to avoid race
wid := id
go func() {
sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
wg.Done()
}()
}
sws.inGroups = nil
sws.mu.Unlock()
wps.watchers = nil
wps.mu.Unlock()
wg.Wait()
close(sws.watchCh)
close(wps.watchCh)
}
func (sws *serverWatchStream) recvLoop() error {
defer sws.close()
func (wps *watchProxyStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
req, err := wps.stream.Recv()
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
cr := uv.CreateRequest
w := &watcher{
wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
id: wps.nextWatcherID,
wps: wps,
watcher := watcher{
wr: watchRange{
key: string(cr.Key),
end: string(cr.RangeEnd),
},
id: sws.nextWatcherID,
sws: sws,
nextrev: cr.StartRevision,
progress: cr.ProgressNotify,
filters: v3rpc.FiltersFromRequest(cr),
}
if cr.StartRevision != 0 {
sws.addDedicatedWatcher(watcher, cr.StartRevision)
} else {
sws.addCoalescedWatcher(watcher)
if !w.wr.valid() {
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
continue
}
sws.nextWatcherID++
wps.nextWatcherID++
w.nextrev = cr.StartRevision
wps.watchers[w.id] = w
wps.ranges.add(w)
case *pb.WatchRequest_CancelRequest:
sws.removeWatcher(uv.CancelRequest.WatchId)
wps.delete(uv.CancelRequest.WatchId)
default:
panic("not implemented")
}
}
}
func (sws *serverWatchStream) sendLoop() {
func (wps *watchProxyStream) sendLoop() {
for {
select {
case wresp, ok := <-sws.watchCh:
case wresp, ok := <-wps.watchCh:
if !ok {
return
}
if err := sws.gRPCStream.Send(wresp); err != nil {
if err := wps.stream.Send(wresp); err != nil {
return
}
case <-sws.ctx.Done():
case <-wps.ctx.Done():
return
}
}
}
func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
sws.mu.Lock()
defer sws.mu.Unlock()
func (wps *watchProxyStream) delete(id int64) {
wps.mu.Lock()
defer wps.mu.Unlock()
rid := receiverID{streamID: sws.id, watcherID: w.id}
sws.groups.addWatcher(rid, w)
sws.inGroups[w.id] = struct{}{}
}
func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
ctx, cancel := context.WithCancel(sws.ctx)
wch := sws.cw.Watch(ctx,
w.wr.key, clientv3.WithRange(w.wr.end),
clientv3.WithRev(rev),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)
sws.mu.Lock()
defer sws.mu.Unlock()
ws := newWatcherSingle(wch, cancel, w, sws)
sws.singles[w.id] = ws
go ws.run()
}
func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
sws.mu.Lock()
defer sws.mu.Unlock()
// do not add new watchers when stream is closing
if sws.inGroups == nil {
return false
}
if sws.groups.maybeJoinWatcherSingle(ws) {
delete(sws.singles, ws.w.id)
sws.inGroups[ws.w.id] = struct{}{}
return true
}
return false
}
func (sws *serverWatchStream) removeWatcher(id int64) {
sws.mu.Lock()
defer sws.mu.Unlock()
var (
rev int64
ok bool
)
defer func() {
if !ok {
return
}
resp := &pb.WatchResponse{
Header: &pb.ResponseHeader{
// todo: fill in ClusterId
// todo: fill in MemberId:
Revision: rev,
// todo: fill in RaftTerm:
},
WatchId: id,
Canceled: true,
}
sws.watchCh <- resp
}()
rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
if ok {
delete(sws.inGroups, id)
w, ok := wps.watchers[id]
if !ok {
return
}
var ws *watcherSingle
if ws, ok = sws.singles[id]; ok {
delete(sws.singles, id)
ws.stop()
rev = ws.lastStoreRev
wps.ranges.delete(w)
delete(wps.watchers, id)
resp := &pb.WatchResponse{
Header: &w.lastHeader,
WatchId: id,
Canceled: true,
}
wps.watchCh <- resp
}

View File

@ -0,0 +1,135 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// watchBroadcast broadcasts a server watcher to many client watchers.
type watchBroadcast struct {
// wbs is the backpointer to all broadcasts on this range
wbs *watchBroadcasts
// cancel stops the underlying etcd server watcher and closes ch.
cancel context.CancelFunc
donec chan struct{}
// mu protects rev and receivers.
mu sync.RWMutex
// nextrev is the minimum expected next revision of the watcher on ch.
nextrev int64
// receivers contains all the client-side watchers to serve.
receivers map[*watcher]struct{}
// responses counts the number of responses
responses int
}
func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
cctx, cancel := context.WithCancel(wp.ctx)
wb := &watchBroadcast{
cancel: cancel,
nextrev: w.nextrev,
receivers: make(map[*watcher]struct{}),
donec: make(chan struct{}),
}
wb.add(w)
go func() {
defer close(wb.donec)
// loop because leader loss will close channel
for cctx.Err() == nil {
wch := wp.cw.Watch(cctx, w.wr.key,
clientv3.WithRange(w.wr.end),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
clientv3.WithRev(wb.nextrev),
)
for wr := range wch {
wb.bcast(wr)
update(wb)
}
wp.retryLimiter.Wait(cctx)
}
}()
return wb
}
func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) {
wb.mu.Lock()
defer wb.mu.Unlock()
wb.nextrev = wr.Header.Revision + 1
wb.responses++
for r := range wb.receivers {
r.send(wr)
}
}
// add puts a watcher into receiving a broadcast if its revision at least
// meets the broadcast revision. Returns true if added.
func (wb *watchBroadcast) add(w *watcher) bool {
wb.mu.Lock()
defer wb.mu.Unlock()
if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) {
// wb is too far ahead, w will miss events
// or wb is being established with a current watcher
return false
}
if wb.responses == 0 {
// Newly created; create event will be sent by etcd.
wb.receivers[w] = struct{}{}
return true
}
// already sent by etcd; emulate create event
ok := w.post(&pb.WatchResponse{
Header: &pb.ResponseHeader{
// todo: fill in ClusterId
// todo: fill in MemberId:
Revision: w.nextrev,
// todo: fill in RaftTerm:
},
WatchId: w.id,
Created: true,
})
if !ok {
return false
}
wb.receivers[w] = struct{}{}
return true
}
func (wb *watchBroadcast) delete(w *watcher) {
wb.mu.Lock()
defer wb.mu.Unlock()
if _, ok := wb.receivers[w]; !ok {
panic("deleting missing watcher from broadcast")
}
delete(wb.receivers, w)
}
func (wb *watchBroadcast) size() int {
wb.mu.RLock()
defer wb.mu.RUnlock()
return len(wb.receivers)
}
func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
func (wb *watchBroadcast) stop() {
wb.cancel()
<-wb.donec
}

View File

@ -0,0 +1,132 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
)
type watchBroadcasts struct {
wp *watchProxy
// mu protects bcasts and watchers from the coalesce loop.
mu sync.Mutex
bcasts map[*watchBroadcast]struct{}
watchers map[*watcher]*watchBroadcast
updatec chan *watchBroadcast
donec chan struct{}
}
// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
const maxCoalesceReceivers = 5
func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
wbs := &watchBroadcasts{
wp: wp,
bcasts: make(map[*watchBroadcast]struct{}),
watchers: make(map[*watcher]*watchBroadcast),
updatec: make(chan *watchBroadcast, 1),
donec: make(chan struct{}),
}
go func() {
defer close(wbs.donec)
for wb := range wbs.updatec {
wbs.coalesce(wb)
}
}()
return wbs
}
func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
if wb.size() >= maxCoalesceReceivers {
return
}
wbs.mu.Lock()
for wbswb := range wbs.bcasts {
if wbswb == wb {
continue
}
wbswb.mu.Lock()
// NB: victim lock already held
if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 {
for w := range wb.receivers {
wbswb.receivers[w] = struct{}{}
wbs.watchers[w] = wbswb
}
wb.receivers = nil
}
wbswb.mu.Unlock()
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
break
}
}
wbs.mu.Unlock()
}
func (wbs *watchBroadcasts) add(w *watcher) {
wbs.mu.Lock()
defer wbs.mu.Unlock()
// find fitting bcast
for wb := range wbs.bcasts {
if wb.add(w) {
wbs.watchers[w] = wb
return
}
}
// no fit; create a bcast
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
wbs.watchers[w] = wb
wbs.bcasts[wb] = struct{}{}
}
func (wbs *watchBroadcasts) delete(w *watcher) {
wbs.mu.Lock()
defer wbs.mu.Unlock()
wb, ok := wbs.watchers[w]
if !ok {
panic("deleting missing watcher from broadcasts")
}
delete(wbs.watchers, w)
wb.delete(w)
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
}
}
func (wbs *watchBroadcasts) empty() bool { return len(wbs.bcasts) == 0 }
func (wbs *watchBroadcasts) stop() {
wbs.mu.Lock()
defer wbs.mu.Unlock()
for wb := range wbs.bcasts {
wb.stop()
}
wbs.bcasts = nil
close(wbs.updatec)
<-wbs.donec
}
func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
select {
case wbs.updatec <- wb:
default:
}
}

View File

@ -32,14 +32,27 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
}
func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
ch1, ch2 := make(chan interface{}), make(chan interface{})
// ch1 is buffered so server can send error on close
ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}, nil}}
cctx, ccancel := context.WithCancel(ctx)
cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}}
sctx, scancel := context.WithCancel(ctx)
srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
go func() {
s.wserv.Watch(wserver)
// close the server side sender
close(ch1)
if err := s.wserv.Watch(wserver); err != nil {
select {
case srv.sendc <- err:
case <-sctx.Done():
case <-cctx.Done():
}
}
scancel()
ccancel()
}()
return wclient, nil
}
@ -145,9 +158,10 @@ func (s *chanClientStream) CloseSend() error {
// chanStream implements grpc.Stream using channels
type chanStream struct {
recvc <-chan interface{}
sendc chan<- interface{}
ctx context.Context
recvc <-chan interface{}
sendc chan<- interface{}
ctx context.Context
cancel context.CancelFunc
}
func (s *chanStream) Context() context.Context { return s.ctx }
@ -155,6 +169,9 @@ func (s *chanStream) Context() context.Context { return s.ctx }
func (s *chanStream) SendMsg(m interface{}) error {
select {
case s.sendc <- m:
if err, ok := m.(error); ok {
return err
}
return nil
case <-s.ctx.Done():
}
@ -168,6 +185,9 @@ func (s *chanStream) RecvMsg(m interface{}) error {
if !ok {
return grpc.ErrClientConnClosing
}
if err, ok := msg.(error); ok {
return err
}
*v = msg
return nil
case <-s.ctx.Done():

View File

@ -0,0 +1,70 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
)
// watchRanges tracks all open watches for the proxy.
type watchRanges struct {
wp *watchProxy
mu sync.Mutex
bcasts map[watchRange]*watchBroadcasts
}
func newWatchRanges(wp *watchProxy) *watchRanges {
return &watchRanges{
wp: wp,
bcasts: make(map[watchRange]*watchBroadcasts),
}
}
func (wrs *watchRanges) add(w *watcher) {
wrs.mu.Lock()
defer wrs.mu.Unlock()
if wbs := wrs.bcasts[w.wr]; wbs != nil {
wbs.add(w)
return
}
wbs := newWatchBroadcasts(wrs.wp)
wrs.bcasts[w.wr] = wbs
wbs.add(w)
}
func (wrs *watchRanges) delete(w *watcher) {
wrs.mu.Lock()
defer wrs.mu.Unlock()
wbs, ok := wrs.bcasts[w.wr]
if !ok {
panic("deleting missing range")
}
wbs.delete(w)
if wbs.empty() {
wbs.stop()
delete(wrs.bcasts, w.wr)
}
}
func (wrs *watchRanges) stop() {
wrs.mu.Lock()
defer wrs.mu.Unlock()
for _, wb := range wrs.bcasts {
wb.stop()
}
wrs.bcasts = nil
}

View File

@ -27,32 +27,53 @@ type watchRange struct {
key, end string
}
type watcher struct {
id int64
wr watchRange
sws *serverWatchStream
rev int64
filters []mvcc.FilterFunc
progress bool
func (wr *watchRange) valid() bool {
return len(wr.end) == 0 || wr.end > wr.key || (wr.end[0] == 0 && len(wr.end) == 1)
}
type watcher struct {
// user configuration
wr watchRange
filters []mvcc.FilterFunc
progress bool
// id is the id returned to the client on its watch stream.
id int64
// nextrev is the minimum expected next event revision.
nextrev int64
// lastHeader has the last header sent over the stream.
lastHeader pb.ResponseHeader
// wps is the parent.
wps *watchProxyStream
}
// send filters out repeated events by discarding revisions older
// than the last one sent over the watch channel.
func (w *watcher) send(wr clientv3.WatchResponse) {
if wr.IsProgressNotify() && !w.progress {
return
}
if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {
return
}
if w.nextrev == 0 {
// current watch; expect updates following this revision
w.nextrev = wr.Header.Revision + 1
}
events := make([]*mvccpb.Event, 0, len(wr.Events))
var lastRev int64
for i := range wr.Events {
ev := (*mvccpb.Event)(wr.Events[i])
if ev.Kv.ModRevision <= w.rev {
if ev.Kv.ModRevision < w.nextrev {
continue
} else {
// We cannot update w.rev here.
// txn can have multiple events with the same rev.
// If we update w.rev here, we would skip some events in the same txn.
// If w.nextrev updates here, it would skip events in the same txn.
lastRev = ev.Kv.ModRevision
}
@ -71,8 +92,8 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
}
}
if lastRev > w.rev {
w.rev = lastRev
if lastRev >= w.nextrev {
w.nextrev = lastRev + 1
}
// all events are filtered out?
@ -80,15 +101,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
return
}
pbwr := &pb.WatchResponse{
w.lastHeader = wr.Header
w.post(&pb.WatchResponse{
Header: &wr.Header,
Created: wr.Created,
WatchId: w.id,
Events: events,
}
select {
case w.sws.watchCh <- pbwr:
case <-time.After(50 * time.Millisecond):
w.sws.cancel()
}
})
}
// post puts a watch response on the watcher's proxy stream channel
func (w *watcher) post(wr *pb.WatchResponse) bool {
select {
case w.wps.watchCh <- wr:
case <-time.After(50 * time.Millisecond):
w.wps.cancel()
return false
}
return true
}

View File

@ -1,106 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
type watcherGroup struct {
// ch delievers events received from the etcd server
ch clientv3.WatchChan
// cancel is used to cancel the underlying etcd server watcher
// It should also close the ch.
cancel context.CancelFunc
mu sync.Mutex
rev int64 // current revision of the watchergroup
receivers map[receiverID]watcher
donec chan struct{}
}
type receiverID struct {
streamID, watcherID int64
}
func newWatchergroup(wch clientv3.WatchChan, c context.CancelFunc) *watcherGroup {
return &watcherGroup{
ch: wch,
cancel: c,
receivers: make(map[receiverID]watcher),
donec: make(chan struct{}),
}
}
func (wg *watcherGroup) run() {
defer close(wg.donec)
for wr := range wg.ch {
wg.broadcast(wr)
}
}
func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
wg.mu.Lock()
defer wg.mu.Unlock()
wg.rev = wr.Header.Revision
for _, r := range wg.receivers {
r.send(wr)
}
}
// add adds the watcher into the group with given ID.
// The current revision of the watcherGroup is returned or -1
// if the watcher is at a revision prior to the watcher group.
func (wg *watcherGroup) add(rid receiverID, w watcher) int64 {
wg.mu.Lock()
defer wg.mu.Unlock()
if wg.rev > w.rev {
return -1
}
wg.receivers[rid] = w
return wg.rev
}
func (wg *watcherGroup) delete(rid receiverID) {
wg.mu.Lock()
defer wg.mu.Unlock()
delete(wg.receivers, rid)
}
func (wg *watcherGroup) isEmpty() bool {
wg.mu.Lock()
defer wg.mu.Unlock()
return len(wg.receivers) == 0
}
func (wg *watcherGroup) stop() {
wg.cancel()
<-wg.donec
}
func (wg *watcherGroup) revision() int64 {
wg.mu.Lock()
defer wg.mu.Unlock()
return wg.rev
}

View File

@ -1,50 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"testing"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
func TestWatchgroupBroadcast(t *testing.T) {
wch := make(chan clientv3.WatchResponse, 0)
wg := newWatchergroup(wch, nil)
go wg.run()
chs := make([]chan *pb.WatchResponse, 10)
for i := range chs {
chs[i] = make(chan *pb.WatchResponse, 1)
w := watcher{
id: int64(i),
sws: &serverWatchStream{watchCh: chs[i], ctx: context.TODO()},
progress: true,
}
rid := receiverID{streamID: 1, watcherID: w.id}
wg.add(rid, w)
}
// send a progress response
wch <- clientv3.WatchResponse{Header: pb.ResponseHeader{Revision: 1}}
for _, ch := range chs {
<-ch
}
}

View File

@ -1,128 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
)
type watchergroups struct {
cw clientv3.Watcher
mu sync.Mutex
groups map[watchRange]*watcherGroup
idToGroup map[receiverID]*watcherGroup
proxyCtx context.Context
}
func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
wgs.mu.Lock()
defer wgs.mu.Unlock()
groups := wgs.groups
if wg, ok := groups[w.wr]; ok {
rev := wg.add(rid, w)
wgs.idToGroup[rid] = wg
if rev == 0 {
// The group is newly created, the create event has not been delivered
// to this group yet.
// We can rely on etcd server to deliver the create event.
// Or we might end up sending created event twice.
return
}
resp := &pb.WatchResponse{
Header: &pb.ResponseHeader{
// todo: fill in ClusterId
// todo: fill in MemberId:
Revision: rev,
// todo: fill in RaftTerm:
},
WatchId: rid.watcherID,
Created: true,
}
select {
case w.sws.watchCh <- resp:
case <-w.sws.ctx.Done():
}
return
}
ctx, cancel := context.WithCancel(wgs.proxyCtx)
wch := wgs.cw.Watch(ctx, w.wr.key,
clientv3.WithRange(w.wr.end),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)
watchg := newWatchergroup(wch, cancel)
watchg.add(rid, w)
go watchg.run()
groups[w.wr] = watchg
wgs.idToGroup[rid] = watchg
}
func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) {
wgs.mu.Lock()
defer wgs.mu.Unlock()
if g, ok := wgs.idToGroup[rid]; ok {
g.delete(rid)
delete(wgs.idToGroup, rid)
if g.isEmpty() {
g.stop()
}
return g.revision(), true
}
return -1, false
}
func (wgs *watchergroups) maybeJoinWatcherSingle(ws watcherSingle) bool {
wgs.mu.Lock()
defer wgs.mu.Unlock()
rid := receiverID{streamID: ws.sws.id, watcherID: ws.w.id}
group, ok := wgs.groups[ws.w.wr]
if ok {
return group.add(rid, ws.w) != -1
}
if !ws.canPromote() {
return false
}
wg := newWatchergroup(ws.ch, ws.cancel)
wgs.groups[ws.w.wr] = wg
wgs.idToGroup[rid] = wg
wg.add(rid, ws.w)
go wg.run()
return true
}
func (wgs *watchergroups) stop() {
wgs.mu.Lock()
defer wgs.mu.Unlock()
for _, wg := range wgs.groups {
wg.stop()
}
wgs.groups = nil
}

View File

@ -1,73 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
)
type watcherSingle struct {
// ch delievers events received from the etcd server
ch clientv3.WatchChan
// cancel is used to cancel the underlying etcd server watcher
// It should also close the ch.
cancel context.CancelFunc
// sws is the stream this watcherSingle attached to
sws *serverWatchStream
w watcher
lastStoreRev int64 // last seen revision of the remote mvcc store
donec chan struct{}
}
func newWatcherSingle(wch clientv3.WatchChan, c context.CancelFunc, w watcher, sws *serverWatchStream) *watcherSingle {
return &watcherSingle{
sws: sws,
ch: wch,
cancel: c,
w: w,
donec: make(chan struct{}),
}
}
func (ws watcherSingle) run() {
defer close(ws.donec)
for wr := range ws.ch {
ws.lastStoreRev = wr.Header.Revision
ws.w.send(wr)
if ws.sws.maybeCoalesceWatcher(ws) {
return
}
}
}
// canPromote returns true if a watcherSingle can promote itself to a watchergroup
// when it already caught up with the last seen revision from the response header
// of an etcd server.
func (ws watcherSingle) canPromote() bool {
return ws.w.rev == ws.lastStoreRev
}
func (ws watcherSingle) stop() {
ws.cancel()
<-ws.donec
}