diff --git a/store/store.go b/store/store.go index abffabf74..14d480b9c 100644 --- a/store/store.go +++ b/store/store.go @@ -451,6 +451,8 @@ func (s *store) deleteExpiredKeys(cutoff time.Time) { s.Stats.Inc(ExpireCount) s.WatcherHub.notify(newEvent(Expire, node.Path, s.Index, s.Term)) } + + s.WatcherHub.clearPendingWatchers() } // checkDir function will check whether the component is a directory under parent node. diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 4c4bfd29d..9e0285b47 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -16,9 +16,10 @@ import ( // event happens between the end of the first watch command and the start // of the second command. type watcherHub struct { - watchers map[string]*list.List - count int64 // current number of watchers. - EventHistory *EventHistory + watchers map[string]*list.List + count int64 // current number of watchers. + EventHistory *EventHistory + pendingWatchers *list.List } // newWatchHub creates a watchHub. The capacity determines how many events we will @@ -27,8 +28,9 @@ type watcherHub struct { // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000 func newWatchHub(capacity int) *watcherHub { return &watcherHub{ - watchers: make(map[string]*list.List), - EventHistory: newEventHistory(capacity), + watchers: make(map[string]*list.List), + EventHistory: newEventHistory(capacity), + pendingWatchers: list.New(), } } @@ -117,9 +119,13 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { // if we successfully notify a watcher // we need to remove the watcher from the list // and decrease the counter - l.Remove(curr) atomic.AddInt64(&wh.count, -1) + + if e.Action == Expire { + wh.pendingWatchers.PushBack(w) + } + } else { // once there is a watcher in the list is not interested // in the event, we should keep the list in the map @@ -131,6 +137,14 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { } } +func (wh *watcherHub) clearPendingWatchers() { + for e := wh.pendingWatchers.Front(); e != nil; e = e.Next() { + w, _ := e.Value.(*watcher) + w.eventChan <- nil + } + wh.pendingWatchers = list.New() +} + // clone function clones the watcherHub and return the cloned one. // only clone the static content. do not clone the current watchers. func (wh *watcherHub) clone() *watcherHub {