mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
fix(store): properly hide hidden keys from watchers, not just gets
This commit is contained in:
parent
a417782151
commit
139f59f7d1
@ -289,7 +289,10 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|||||||
// update etcd index
|
// update etcd index
|
||||||
s.CurrentIndex++
|
s.CurrentIndex++
|
||||||
|
|
||||||
|
if !n.IsHidden() {
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
|
}
|
||||||
|
|
||||||
s.Stats.Inc(DeleteSuccess)
|
s.Stats.Inc(DeleteSuccess)
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
@ -429,7 +432,9 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
|||||||
|
|
||||||
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
||||||
|
|
||||||
|
if !n.IsHidden() {
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
|
}
|
||||||
|
|
||||||
s.Stats.Inc(UpdateSuccess)
|
s.Stats.Inc(UpdateSuccess)
|
||||||
|
|
||||||
@ -513,7 +518,10 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
|||||||
|
|
||||||
s.CurrentIndex = nextIndex
|
s.CurrentIndex = nextIndex
|
||||||
|
|
||||||
|
if !n.IsHidden() {
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
|
}
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,8 +576,11 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
|
|||||||
node.Remove(true, true, callback)
|
node.Remove(true, true, callback)
|
||||||
|
|
||||||
s.Stats.Inc(ExpireCount)
|
s.Stats.Inc(ExpireCount)
|
||||||
|
|
||||||
|
if !node.IsHidden() {
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,6 +499,15 @@ func TestStoreWatchCreate(t *testing.T) {
|
|||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store doesn't see hidden key creations.
|
||||||
|
func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
w, _ := s.Watch("/_foo", false, false, 0)
|
||||||
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for recursive key creation.
|
// Ensure that the store can watch for recursive key creation.
|
||||||
func TestStoreWatchRecursiveCreate(t *testing.T) {
|
func TestStoreWatchRecursiveCreate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -509,6 +518,15 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store can watch for recursive key creation.
|
||||||
|
func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for key updates.
|
// Ensure that the store can watch for key updates.
|
||||||
func TestStoreWatchUpdate(t *testing.T) {
|
func TestStoreWatchUpdate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -520,6 +538,16 @@ func TestStoreWatchUpdate(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store doesn't see hidden key updates.
|
||||||
|
func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
|
w, _ := s.Watch("/_foo", false, false, 0)
|
||||||
|
s.Update("/_foo", "baz", Permanent)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for recursive key updates.
|
// Ensure that the store can watch for recursive key updates.
|
||||||
func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -531,6 +559,16 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store doesn't get recursive key updates for hidden keys.
|
||||||
|
func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
|
s.Update("/foo/_bar", "baz", Permanent)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for key deletions.
|
// Ensure that the store can watch for key deletions.
|
||||||
func TestStoreWatchDelete(t *testing.T) {
|
func TestStoreWatchDelete(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -542,6 +580,16 @@ func TestStoreWatchDelete(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store doesn't see hidden key deletions.
|
||||||
|
func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
|
s.Delete("/_foo", false, false)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for recursive key deletions.
|
// Ensure that the store can watch for recursive key deletions.
|
||||||
func TestStoreWatchRecursiveDelete(t *testing.T) {
|
func TestStoreWatchRecursiveDelete(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -553,6 +601,16 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store can watch for recursive key deletions.
|
||||||
|
func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
|
s.Delete("/foo/_bar", false, false)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch for CAS updates.
|
// Ensure that the store can watch for CAS updates.
|
||||||
func TestStoreWatchCompareAndSwap(t *testing.T) {
|
func TestStoreWatchCompareAndSwap(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
@ -602,6 +660,32 @@ func TestStoreWatchExpire(t *testing.T) {
|
|||||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store doesn't see expirations of hidden keys.
|
||||||
|
func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
|
||||||
|
stopChan := make(chan bool)
|
||||||
|
defer func() {
|
||||||
|
stopChan <- true
|
||||||
|
}()
|
||||||
|
go mockSyncService(s.DeleteExpiredKeys, stopChan)
|
||||||
|
|
||||||
|
s.Create("/_foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||||
|
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
||||||
|
|
||||||
|
w, _ := s.Watch("/", true, false, 0)
|
||||||
|
c := w.EventChan
|
||||||
|
e := nbselect(c)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
time.Sleep(600 * time.Millisecond)
|
||||||
|
e = nbselect(c)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
time.Sleep(600 * time.Millisecond)
|
||||||
|
e = nbselect(c)
|
||||||
|
assert.Equal(t, e.Action, "expire", "")
|
||||||
|
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can watch in streaming mode.
|
// Ensure that the store can watch in streaming mode.
|
||||||
func TestStoreWatchStream(t *testing.T) {
|
func TestStoreWatchStream(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
|
@ -113,11 +113,11 @@ func (wh *watcherHub) notify(e *Event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||||
wh.mutex.Lock()
|
wh.mutex.Lock()
|
||||||
defer wh.mutex.Unlock()
|
defer wh.mutex.Unlock()
|
||||||
|
|
||||||
l, ok := wh.watchers[path]
|
l, ok := wh.watchers[nodePath]
|
||||||
if ok {
|
if ok {
|
||||||
curr := l.Front()
|
curr := l.Front()
|
||||||
|
|
||||||
@ -126,7 +126,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
|||||||
|
|
||||||
w, _ := curr.Value.(*Watcher)
|
w, _ := curr.Value.(*Watcher)
|
||||||
|
|
||||||
if w.notify(e, e.Node.Key == path, deleted) {
|
if !isHidden(nodePath) && w.notify(e, e.Node.Key == nodePath, deleted) {
|
||||||
if !w.stream { // do not remove the stream watcher
|
if !w.stream { // do not remove the stream watcher
|
||||||
// if we successfully notify a watcher
|
// if we successfully notify a watcher
|
||||||
// we need to remove the watcher from the list
|
// we need to remove the watcher from the list
|
||||||
@ -142,7 +142,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
|||||||
if l.Len() == 0 {
|
if l.Len() == 0 {
|
||||||
// if we have notified all watcher in the list
|
// if we have notified all watcher in the list
|
||||||
// we can delete the list
|
// we can delete the list
|
||||||
delete(wh.watchers, path)
|
delete(wh.watchers, nodePath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -156,3 +156,14 @@ func (wh *watcherHub) clone() *watcherHub {
|
|||||||
EventHistory: clonedHistory,
|
EventHistory: clonedHistory,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isHidden checks if a path has a hidden key. since we don't get the Node
|
||||||
|
// object for notifyWatchers, we have to duplicate it here. consolidate me?
|
||||||
|
func isHidden(nodePath string) bool {
|
||||||
|
_, name := path.Split(nodePath)
|
||||||
|
if name == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return name[0] == '_'
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user