From 5097a2adee0ca27d3af6a2225e62b1d2aba93626 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 1 Dec 2013 00:47:23 -0500 Subject: [PATCH] fix(event_history.go) should not scan prefix --- store/event_history.go | 13 ++++++++++--- store/event_test.go | 8 ++++---- store/store.go | 9 ++++----- store/watcher_hub.go | 12 ++++++------ 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/store/event_history.go b/store/event_history.go index 4fd077184..d61d54d60 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -2,6 +2,7 @@ package store import ( "fmt" + "path" "strings" "sync" @@ -39,8 +40,8 @@ func (eh *EventHistory) addEvent(e *Event) *Event { } // scan function is enumerating events from the index in history and -// stops till the first point where the key has identified prefix -func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { +// stops till the first point where the key has identified key +func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) { eh.rwl.RLock() defer eh.rwl.RUnlock() @@ -62,7 +63,13 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro for { e := eh.Queue.Events[i] - if strings.HasPrefix(e.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one + ok := (e.Key == key) + + if recursive { + ok = ok || strings.HasPrefix(e.Key, path.Join(key, "/")) + } + + if ok && index <= e.Index() { // make sure we bypass the smaller one return e, nil } diff --git a/store/event_test.go b/store/event_test.go index dc30ce44d..1d82ebf74 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -41,24 +41,24 @@ func TestScanHistory(t *testing.T) { eh.addEvent(newEvent(Create, "/foo/bar/bar", 4)) eh.addEvent(newEvent(Create, "/foo/foo/foo", 5)) - e, err := eh.scan("/foo", 1) + e, err := eh.scan("/foo", false, 1) if err != nil || e.Index() != 1 { t.Fatalf("scan error [/foo] [1] %v", e.Index) } - e, err = eh.scan("/foo/bar", 1) + e, err = eh.scan("/foo/bar", false, 1) if err != nil || e.Index() != 2 { t.Fatalf("scan error [/foo/bar] [2] %v", e.Index) } - e, err = eh.scan("/foo/bar", 3) + e, err = eh.scan("/foo/bar", true, 3) if err != nil || e.Index() != 4 { t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index) } - e, err = eh.scan("/foo/bar", 6) + e, err = eh.scan("/foo/bar", true, 6) if e != nil { t.Fatalf("bad index shoud reuturn nil") diff --git a/store/store.go b/store/store.go index 80f9622af..1edb02f29 100644 --- a/store/store.go +++ b/store/store.go @@ -280,8 +280,8 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) { return e, nil } -func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { - prefix = path.Clean(path.Join("/", prefix)) +func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { + key = path.Clean(path.Join("/", key)) nextIndex := s.CurrentIndex + 1 @@ -292,10 +292,10 @@ func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan var err *etcdErr.Error if sinceIndex == 0 { - c, err = s.WatcherHub.watch(prefix, recursive, nextIndex) + c, err = s.WatcherHub.watch(key, recursive, nextIndex) } else { - c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex) + c, err = s.WatcherHub.watch(key, recursive, sinceIndex) } if err != nil { @@ -396,7 +396,6 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla expireTime = Permanent } - dir, newNodeName := path.Split(nodePath) // walk through the nodePath, create dirs and get the last directory node diff --git a/store/watcher_hub.go b/store/watcher_hub.go index b952aec6d..19eef7a57 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -33,11 +33,11 @@ func newWatchHub(capacity int) *watcherHub { } // watch function returns an Event channel. -// If recursive is true, the first change after index under prefix will be sent to the event channel. -// If recursive is false, the first change after index at prefix will be sent to the event channel. +// If recursive is true, the first change after index under key will be sent to the event channel. +// If recursive is false, the first change after index at key will be sent to the event channel. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { - event, err := wh.EventHistory.scan(prefix, index) +func (wh *watcherHub) watch(key string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { + event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { return nil, err @@ -57,7 +57,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan sinceIndex: index, } - l, ok := wh.watchers[prefix] + l, ok := wh.watchers[key] if ok { // add the new watcher to the back of the list l.PushBack(w) @@ -65,7 +65,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan } else { // create a new list and add the new watcher l := list.New() l.PushBack(w) - wh.watchers[prefix] = l + wh.watchers[key] = l } atomic.AddInt64(&wh.count, 1)