diff --git a/store/store.go b/store/store.go index 8593c5178..05f6626cc 100644 --- a/store/store.go +++ b/store/store.go @@ -290,6 +290,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { s.CurrentIndex++ s.WatcherHub.notify(e) + s.Stats.Inc(DeleteSuccess) return e, nil @@ -514,6 +515,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique, s.CurrentIndex = nextIndex s.WatcherHub.notify(e) + return e, nil } @@ -568,6 +570,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) { node.Remove(true, true, callback) s.Stats.Inc(ExpireCount) + s.WatcherHub.notify(e) } diff --git a/store/store_test.go b/store/store_test.go index 5eae1b46d..0a7e49e4f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -682,6 +682,106 @@ func TestStoreRecoverWithExpiration(t *testing.T) { assert.Nil(t, e, "") } +// Ensure that the store can watch for hidden keys as long as it's an exact path match. +func TestStoreWatchCreateWithHiddenKey(t *testing.T) { + s := newStore() + w, _ := s.Watch("/_foo", false, false, 0) + s.Create("/_foo", false, "bar", false, Permanent) + e := nbselect(w.EventChan) + assert.Equal(t, e.Action, "create", "") + assert.Equal(t, e.Node.Key, "/_foo", "") + e = nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store doesn't see hidden key creates without an exact path match in recursive mode. +func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) { + s := newStore() + w, _ := s.Watch("/foo", true, false, 0) + s.Create("/foo/_bar", false, "baz", false, Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") + w, _ = s.Watch("/foo", true, false, 0) + s.Create("/foo/_baz", true, "", false, Permanent) + e = nbselect(w.EventChan) + assert.Nil(t, e, "") + s.Create("/foo/_baz/quux", false, "quux", false, Permanent) + e = nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store doesn't see hidden key updates. +func TestStoreWatchUpdateWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/_foo", false, "bar", false, Permanent) + w, _ := s.Watch("/_foo", false, false, 0) + s.Update("/_foo", "baz", Permanent) + e := nbselect(w.EventChan) + assert.Equal(t, e.Action, "update", "") + assert.Equal(t, e.Node.Key, "/_foo", "") + e = nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store doesn't see hidden key updates without an exact path match in recursive mode. +func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/foo/_bar", false, "baz", false, Permanent) + w, _ := s.Watch("/foo", true, false, 0) + s.Update("/foo/_bar", "baz", Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store can watch for key deletions. +func TestStoreWatchDeleteWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/_foo", false, "bar", false, Permanent) + w, _ := s.Watch("/_foo", false, false, 0) + s.Delete("/_foo", false, false) + e := nbselect(w.EventChan) + assert.Equal(t, e.Action, "delete", "") + assert.Equal(t, e.Node.Key, "/_foo", "") + e = nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store doesn't see hidden key deletes without an exact path match in recursive mode. +func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/foo/_bar", false, "baz", false, Permanent) + w, _ := s.Watch("/foo", true, false, 0) + s.Delete("/foo/_bar", false, false) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + +// Ensure that the store doesn't see expirations of hidden keys. +func TestStoreWatchExpireWithHiddenKey(t *testing.T) { + s := newStore() + + stopChan := make(chan bool) + defer func() { + stopChan <- true + }() + go mockSyncService(s.DeleteExpiredKeys, stopChan) + + s.Create("/_foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) + s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond)) + + w, _ := s.Watch("/", true, false, 0) + c := w.EventChan + e := nbselect(c) + assert.Nil(t, e, "") + time.Sleep(600 * time.Millisecond) + e = nbselect(c) + assert.Nil(t, e, "") + time.Sleep(600 * time.Millisecond) + e = nbselect(c) + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foofoo", "") +} + // Performs a non-blocking select on an event channel. func nbselect(c <-chan *Event) *Event { select { diff --git a/store/watcher_hub.go b/store/watcher_hub.go index aeda171a9..3ed8d2697 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -113,11 +113,11 @@ func (wh *watcherHub) notify(e *Event) { } } -func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { +func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) { wh.mutex.Lock() defer wh.mutex.Unlock() - l, ok := wh.watchers[path] + l, ok := wh.watchers[nodePath] if ok { curr := l.Front() @@ -126,7 +126,8 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { w, _ := curr.Value.(*Watcher) - if w.notify(e, e.Node.Key == path, deleted) { + originalPath := (e.Node.Key == nodePath) + if (originalPath || !isHidden(e.Node.Key)) && w.notify(e, originalPath, deleted) { if !w.stream { // do not remove the stream watcher // if we successfully notify a watcher // we need to remove the watcher from the list @@ -142,7 +143,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { if l.Len() == 0 { // if we have notified all watcher in the list // we can delete the list - delete(wh.watchers, path) + delete(wh.watchers, nodePath) } } } @@ -156,3 +157,9 @@ func (wh *watcherHub) clone() *watcherHub { EventHistory: clonedHistory, } } + +// isHidden checks to see if this path is considered hidden i.e. the +// last element is hidden or it's within a hidden directory +func isHidden(nodePath string) bool { + return strings.Contains(nodePath, "/_") +}