diff --git a/store/store.go b/store/store.go index 8593c5178..eb8c62123 100644 --- a/store/store.go +++ b/store/store.go @@ -289,7 +289,10 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { // update etcd index s.CurrentIndex++ - s.WatcherHub.notify(e) + if !n.IsHidden() { + s.WatcherHub.notify(e) + } + s.Stats.Inc(DeleteSuccess) return e, nil @@ -429,7 +432,9 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( eNode.Expiration, eNode.TTL = n.ExpirationAndTTL() - s.WatcherHub.notify(e) + if !n.IsHidden() { + s.WatcherHub.notify(e) + } s.Stats.Inc(UpdateSuccess) @@ -513,7 +518,10 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique, s.CurrentIndex = nextIndex - s.WatcherHub.notify(e) + if !n.IsHidden() { + s.WatcherHub.notify(e) + } + return e, nil } @@ -568,7 +576,10 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) { node.Remove(true, true, callback) s.Stats.Inc(ExpireCount) - s.WatcherHub.notify(e) + + if !node.IsHidden() { + s.WatcherHub.notify(e) + } } } diff --git a/store/store_test.go b/store/store_test.go index 5eae1b46d..c08314250 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -499,6 +499,15 @@ func TestStoreWatchCreate(t *testing.T) { assert.Nil(t, e, "") } +// Ensure that the store doesn't see hidden key creations. +func TestStoreWatchCreateWithHiddenKey(t *testing.T) { + s := newStore() + w, _ := s.Watch("/_foo", false, false, 0) + s.Create("/_foo", false, "bar", false, Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for recursive key creation. func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() @@ -509,6 +518,15 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { assert.Equal(t, e.Node.Key, "/foo/bar", "") } +// Ensure that the store can watch for recursive key creation. +func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) { + s := newStore() + w, _ := s.Watch("/foo", true, false, 0) + s.Create("/foo/_bar", false, "baz", false, Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for key updates. func TestStoreWatchUpdate(t *testing.T) { s := newStore() @@ -520,6 +538,16 @@ func TestStoreWatchUpdate(t *testing.T) { assert.Equal(t, e.Node.Key, "/foo", "") } +// Ensure that the store doesn't see hidden key updates. +func TestStoreWatchUpdateWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/_foo", false, "bar", false, Permanent) + w, _ := s.Watch("/_foo", false, false, 0) + s.Update("/_foo", "baz", Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for recursive key updates. func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() @@ -531,6 +559,16 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { assert.Equal(t, e.Node.Key, "/foo/bar", "") } +// Ensure that the store doesn't get recursive key updates for hidden keys. +func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/foo/_bar", false, "baz", false, Permanent) + w, _ := s.Watch("/foo", true, false, 0) + s.Update("/foo/_bar", "baz", Permanent) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for key deletions. func TestStoreWatchDelete(t *testing.T) { s := newStore() @@ -542,6 +580,16 @@ func TestStoreWatchDelete(t *testing.T) { assert.Equal(t, e.Node.Key, "/foo", "") } +// Ensure that the store doesn't see hidden key deletions. +func TestStoreWatchDeleteWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/_foo", false, "bar", false, Permanent) + w, _ := s.Watch("/foo", false, false, 0) + s.Delete("/_foo", false, false) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for recursive key deletions. func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() @@ -553,6 +601,16 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { assert.Equal(t, e.Node.Key, "/foo/bar", "") } +// Ensure that the store can watch for recursive key deletions. +func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) { + s := newStore() + s.Create("/foo/_bar", false, "baz", false, Permanent) + w, _ := s.Watch("/foo", true, false, 0) + s.Delete("/foo/_bar", false, false) + e := nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can watch for CAS updates. func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() @@ -602,6 +660,32 @@ func TestStoreWatchExpire(t *testing.T) { assert.Equal(t, e.Node.Key, "/foofoo", "") } +// Ensure that the store doesn't see expirations of hidden keys. +func TestStoreWatchExpireWithHiddenKey(t *testing.T) { + s := newStore() + + stopChan := make(chan bool) + defer func() { + stopChan <- true + }() + go mockSyncService(s.DeleteExpiredKeys, stopChan) + + s.Create("/_foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) + s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond)) + + w, _ := s.Watch("/", true, false, 0) + c := w.EventChan + e := nbselect(c) + assert.Nil(t, e, "") + time.Sleep(600 * time.Millisecond) + e = nbselect(c) + assert.Nil(t, e, "") + time.Sleep(600 * time.Millisecond) + e = nbselect(c) + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foofoo", "") +} + // Ensure that the store can watch in streaming mode. func TestStoreWatchStream(t *testing.T) { s := newStore() diff --git a/store/watcher_hub.go b/store/watcher_hub.go index aeda171a9..8c2487ab6 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -113,11 +113,11 @@ func (wh *watcherHub) notify(e *Event) { } } -func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { +func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) { wh.mutex.Lock() defer wh.mutex.Unlock() - l, ok := wh.watchers[path] + l, ok := wh.watchers[nodePath] if ok { curr := l.Front() @@ -126,7 +126,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { w, _ := curr.Value.(*Watcher) - if w.notify(e, e.Node.Key == path, deleted) { + if !isHidden(nodePath) && w.notify(e, e.Node.Key == nodePath, deleted) { if !w.stream { // do not remove the stream watcher // if we successfully notify a watcher // we need to remove the watcher from the list @@ -142,7 +142,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { if l.Len() == 0 { // if we have notified all watcher in the list // we can delete the list - delete(wh.watchers, path) + delete(wh.watchers, nodePath) } } } @@ -156,3 +156,14 @@ func (wh *watcherHub) clone() *watcherHub { EventHistory: clonedHistory, } } + +// isHidden checks if a path has a hidden key. since we don't get the Node +// object for notifyWatchers, we have to duplicate it here. consolidate me? +func isHidden(nodePath string) bool { + _, name := path.Split(nodePath) + if name == "" { + return false + } + + return name[0] == '_' +}