From 8bed1e1f15ecd3444aff932ec7aae3412905f616 Mon Sep 17 00:00:00 2001 From: Cenk Alti Date: Fri, 14 Feb 2014 16:16:55 -0800 Subject: [PATCH] fix(store/watch): fix the slow consumer bug --- store/watcher.go | 18 +++++++++--------- store/watcher_hub.go | 5 +---- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/store/watcher.go b/store/watcher.go index 7a11656c6..6d583b6a7 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -21,6 +21,7 @@ type Watcher struct { stream bool recursive bool sinceIndex uint64 + hub *watcherHub removed bool remove func() } @@ -51,8 +52,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { select { case w.EventChan <- e: default: - // We have missed a notification. Close the channel to indicate this situation. - close(w.EventChan) + // We have missed a notification. Remove the watcher. + // Removing the watcher also closes the EventChan. + w.remove() } return true } @@ -62,11 +64,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // Remove removes the watcher from watcherHub // The actual remove function is guaranteed to only be executed once func (w *Watcher) Remove() { - if w.remove != nil { - w.remove() - } else { - // We attached a remove function to watcher - // Other pkg cannot change it, so this should not happen - panic("missing Watcher remove function") - } + w.hub.mutex.Lock() + defer w.hub.mutex.Unlock() + + close(w.EventChan) + w.remove() } diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 03211a470..45b98cfd2 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -50,6 +50,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (* recursive: recursive, stream: stream, sinceIndex: index, + hub: wh, } if event != nil { @@ -77,10 +78,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (* if w.removed { // avoid remove it twice return } - - wh.mutex.Lock() - defer wh.mutex.Unlock() - w.removed = true l.Remove(elem) atomic.AddInt64(&wh.count, -1)