Merge pull request #6339 from xiang90/close

grpcproxy: stop watchers in watch groups
This commit is contained in:
Xiang Li 2016-09-02 16:03:12 -07:00 committed by GitHub
commit 81bd381048
2 changed files with 30 additions and 15 deletions

View File

@ -59,9 +59,10 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Unlock()
sws := serverWatchStream{
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
inGroups: make(map[int64]struct{}),
id: wp.nextStreamID,
gRPCStream: stream,
@ -80,9 +81,10 @@ type serverWatchStream struct {
id int64
cw clientv3.Watcher
mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
inGroups map[int64]struct{}
gRPCStream pb.Watch_WatchServer
@ -94,22 +96,31 @@ type serverWatchStream struct {
}
func (sws *serverWatchStream) close() {
close(sws.watchCh)
var wg sync.WaitGroup
sws.mu.Lock()
wg.Add(len(sws.singles))
wg.Add(len(sws.singles) + len(sws.inGroups))
for _, ws := range sws.singles {
ws.stop()
// copy the range variable to avoid race
copyws := ws
go func() {
<-copyws.stopNotify()
copyws.stop()
wg.Done()
}()
}
for id := range sws.inGroups {
// copy the range variable to avoid race
wid := id
go func() {
sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
wg.Done()
}()
}
sws.inGroups = nil
sws.mu.Unlock()
wg.Wait()
close(sws.watchCh)
}
func (sws *serverWatchStream) recvLoop() error {
@ -176,6 +187,7 @@ func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
rid := receiverID{streamID: sws.id, watcherID: w.id}
sws.groups.addWatcher(rid, w)
sws.inGroups[w.id] = struct{}{}
}
func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
@ -201,8 +213,13 @@ func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
defer sws.mu.Unlock()
rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
// do not add new watchers when stream is closing
if sws.inGroups == nil {
return false
}
if sws.groups.maybeJoinWatcherSingle(rid, ws) {
delete(sws.singles, ws.w.id)
sws.inGroups[ws.w.id] = struct{}{}
return true
}
return false
@ -236,6 +253,7 @@ func (sws *serverWatchStream) removeWatcher(id int64) {
rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
if ok {
delete(sws.inGroups, id)
return
}

View File

@ -70,8 +70,5 @@ func (ws watcherSingle) canPromote() bool {
func (ws watcherSingle) stop() {
ws.cancel()
}
func (ws watcherSingle) stopNotify() <-chan struct{} {
return ws.donec
<-ws.donec
}