From 5e499456f03cf830d4ebea8fa6f675e3c9f89722 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 26 Dec 2013 22:06:15 +0800 Subject: [PATCH 1/5] init cancel watcher --- server/v1/watch_key_handler.go | 4 ++-- server/v2/get_handler.go | 5 +++-- store/store.go | 13 ++++++------ store/store_test.go | 38 +++++++++++++++++---------------- store/watcher.go | 9 ++++---- store/watcher_hub.go | 39 ++++++++++++++++++++-------------- store/watcher_test.go | 9 +++++--- 7 files changed, 66 insertions(+), 51 deletions(-) diff --git a/server/v1/watch_key_handler.go b/server/v1/watch_key_handler.go index 67a0c5f04..a7e6b798a 100644 --- a/server/v1/watch_key_handler.go +++ b/server/v1/watch_key_handler.go @@ -25,11 +25,11 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } // Start the watcher on the store. - c, err := s.Store().Watch(key, false, sinceIndex) + watcher, err := s.Store().NewWatcher(key, false, sinceIndex) if err != nil { return etcdErr.NewError(500, key, s.Store().Index()) } - event := <-c + event := <-watcher.EventChan // Convert event to a response and write to client. b, _ := json.Marshal(event.Response(s.Store().Index())) diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 9a67ea2ae..031ff449e 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -55,7 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { } // Start the watcher on the store. - eventChan, err := s.Store().Watch(key, recursive, sinceIndex) + watcher, err := s.Store().NewWatcher(key, recursive, sinceIndex) if err != nil { return etcdErr.NewError(500, key, s.Store().Index()) } @@ -65,8 +65,9 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { select { case <-closeChan: + watcher.Remove() return nil - case event = <-eventChan: + case event = <-watcher.EventChan: } } else { //get diff --git a/store/store.go b/store/store.go index 08d585056..15c045d41 100644 --- a/store/store.go +++ b/store/store.go @@ -52,7 +52,8 @@ type Store interface { value string, expireTime time.Time) (*Event, error) Delete(nodePath string, recursive, dir bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) - Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) + + NewWatcher(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error) Save() ([]byte, error) Recovery(state []byte) error @@ -340,21 +341,21 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui return e, nil } -func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { +func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watcher, error) { key = path.Clean(path.Join("/", key)) nextIndex := s.CurrentIndex + 1 s.worldLock.RLock() defer s.worldLock.RUnlock() - var c <-chan *Event + var w *Watcher var err *etcdErr.Error if sinceIndex == 0 { - c, err = s.WatcherHub.watch(key, recursive, nextIndex) + w, err = s.WatcherHub.watch(key, recursive, nextIndex) } else { - c, err = s.WatcherHub.watch(key, recursive, sinceIndex) + w, err = s.WatcherHub.watch(key, recursive, sinceIndex) } if err != nil { @@ -364,7 +365,7 @@ func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Ev return nil, err } - return c, nil + return w, nil } // walk function walks all the nodePath and apply the walkFunc on each directory diff --git a/store/store_test.go b/store/store_test.go index b29fea627..4d71c91b9 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -446,7 +446,8 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { // Ensure that the store can watch for key creation. func TestStoreWatchCreate(t *testing.T) { s := newStore() - c, _ := s.Watch("/foo", false, 0) + w, _ := s.NewWatcher("/foo", false, 0) + c := w.EventChan s.Create("/foo", false, "bar", false, Permanent) e := nbselect(c) assert.Equal(t, e.Action, "create", "") @@ -458,9 +459,9 @@ func TestStoreWatchCreate(t *testing.T) { // Ensure that the store can watch for recursive key creation. func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() - c, _ := s.Watch("/foo", true, 0) + w, _ := s.NewWatcher("/foo", true, 0) s.Create("/foo/bar", false, "baz", false, Permanent) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -469,9 +470,9 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - c, _ := s.Watch("/foo", false, 0) + w, _ := s.NewWatcher("/foo", false, 0) s.Update("/foo", "baz", Permanent) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -480,9 +481,9 @@ func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - c, _ := s.Watch("/foo", true, 0) + w, _ := s.NewWatcher("/foo", true, 0) s.Update("/foo/bar", "baz", Permanent) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -491,9 +492,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchDelete(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - c, _ := s.Watch("/foo", false, 0) + w, _ := s.NewWatcher("/foo", false, 0) s.Delete("/foo", false, false) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -502,9 +503,9 @@ func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - c, _ := s.Watch("/foo", true, 0) + w, _ := s.NewWatcher("/foo", true, 0) s.Delete("/foo/bar", false, false) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -513,9 +514,9 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - c, _ := s.Watch("/foo", false, 0) + w, _ := s.NewWatcher("/foo", false, 0) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -524,9 +525,9 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - c, _ := s.Watch("/foo", true, 0) + w, _ := s.NewWatcher("/foo", true, 0) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) - e := nbselect(c) + e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -544,15 +545,16 @@ func TestStoreWatchExpire(t *testing.T) { s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) - c, _ := s.Watch("/", true, 0) + w, _ := s.NewWatcher("/", true, 0) + c := w.EventChan e := nbselect(c) assert.Nil(t, e, "") time.Sleep(600 * time.Millisecond) e = nbselect(c) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foo", "") - c, _ = s.Watch("/", true, 4) - e = nbselect(c) + w, _ = s.NewWatcher("/", true, 4) + e = nbselect(w.EventChan) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foofoo", "") } diff --git a/store/watcher.go b/store/watcher.go index 004ca9222..06060c03b 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -16,15 +16,16 @@ limitations under the License. package store -type watcher struct { - eventChan chan *Event +type Watcher struct { + EventChan chan *Event recursive bool sinceIndex uint64 + Remove func() } // notify function notifies the watcher. If the watcher interests in the given path, // the function will return true. -func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { +func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // watcher is interested the path in three cases and under one condition // the condition is that the event happens after the watcher's sinceIndex @@ -41,7 +42,7 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // should get notified even if "/foo" is not the path it is watching. if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex { - w.eventChan <- e + w.EventChan <- e return true } return false diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 9721ffdb0..a8865d49d 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -36,41 +36,48 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is true, the first change after index under key will be sent to the event channel. // If recursive is false, the first change after index at key will be sent to the event channel. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { +func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { return nil, err } - eventChan := make(chan *Event, 1) // use a buffered channel - - if event != nil { - eventChan <- event - - return eventChan, nil - } - - w := &watcher{ - eventChan: eventChan, + w := &Watcher{ + EventChan: make(chan *Event, 1), // use a buffered channel recursive: recursive, sinceIndex: index, } + if event != nil { + w.EventChan <- event + + return w, nil + } + l, ok := wh.watchers[key] + var elem *list.Element + if ok { // add the new watcher to the back of the list - l.PushBack(w) + elem = l.PushBack(w) } else { // create a new list and add the new watcher - l := list.New() - l.PushBack(w) + l = list.New() + elem = l.PushBack(w) wh.watchers[key] = l } + w.Remove = func() { + l.Remove(elem) + if l.Len() == 0 { + delete(wh.watchers, key) + } + } + atomic.AddInt64(&wh.count, 1) - return eventChan, nil + return w, nil } // notify function accepts an event and notify to the watchers. @@ -109,7 +116,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { next := curr.Next() // save reference to the next one in the list - w, _ := curr.Value.(*watcher) + w, _ := curr.Value.(*Watcher) if w.notify(e, e.Node.Key == path, deleted) { diff --git a/store/watcher_test.go b/store/watcher_test.go index 7d76d83d7..2600fa161 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,10 +23,11 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - c, err := wh.watch("/foo", true, 1) + w, err := wh.watch("/foo", true, 1) if err != nil { t.Fatalf("%v", err) } + c := w.EventChan select { case <-c: @@ -45,7 +46,8 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - c, _ = wh.watch("/foo", false, 2) + w, _ = wh.watch("/foo", false, 2) + c = w.EventChan e = newEvent(Create, "/foo/bar", 2, 2) @@ -69,7 +71,8 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - c, _ = wh.watch("/fo", true, 1) + w, _ = wh.watch("/fo", true, 1) + c = w.EventChan select { case re = <-c: From 59ccefee0f43d5c96b776f21d57952b129425b14 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Dec 2013 14:55:50 +0800 Subject: [PATCH 2/5] fix(watchhub.go) add a lock to protect the hashmap --- store/store.go | 4 ++-- store/watcher_hub.go | 19 ++++++++++++++----- store/watcher_test.go | 6 +++--- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/store/store.go b/store/store.go index 15c045d41..6d78e18b4 100644 --- a/store/store.go +++ b/store/store.go @@ -352,10 +352,10 @@ func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watc var err *etcdErr.Error if sinceIndex == 0 { - w, err = s.WatcherHub.watch(key, recursive, nextIndex) + w, err = s.WatcherHub.newWatcher(key, recursive, nextIndex) } else { - w, err = s.WatcherHub.watch(key, recursive, sinceIndex) + w, err = s.WatcherHub.newWatcher(key, recursive, sinceIndex) } if err != nil { diff --git a/store/watcher_hub.go b/store/watcher_hub.go index a8865d49d..64b3bdcf8 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -4,6 +4,7 @@ import ( "container/list" "path" "strings" + "sync" "sync/atomic" etcdErr "github.com/coreos/etcd/error" @@ -16,6 +17,7 @@ import ( // event happens between the end of the first watch command and the start // of the second command. type watcherHub struct { + mutex sync.Mutex // protect the hash map watchers map[string]*list.List count int64 // current number of watchers. EventHistory *EventHistory @@ -32,11 +34,11 @@ func newWatchHub(capacity int) *watcherHub { } } -// watch function returns an Event channel. -// If recursive is true, the first change after index under key will be sent to the event channel. -// If recursive is false, the first change after index at key will be sent to the event channel. +// newWatcher function returns a watcher. +// If recursive is true, the first change after index under key will be sent to the event channel of the watcher. +// If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { +func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { @@ -51,10 +53,12 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, if event != nil { w.EventChan <- event - return w, nil } + wh.mutex.Lock() + defer wh.mutex.Unlock() + l, ok := wh.watchers[key] var elem *list.Element @@ -69,6 +73,8 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, } w.Remove = func() { + wh.mutex.Lock() + defer wh.mutex.Unlock() l.Remove(elem) if l.Len() == 0 { delete(wh.watchers, key) @@ -100,6 +106,9 @@ func (wh *watcherHub) notify(e *Event) { } func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { + wh.mutex.Lock() + defer wh.mutex.Unlock() + l, ok := wh.watchers[path] if ok { curr := l.Front() diff --git a/store/watcher_test.go b/store/watcher_test.go index 2600fa161..aa485645a 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,7 +23,7 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - w, err := wh.watch("/foo", true, 1) + w, err := wh.newWatcher("/foo", true, 1) if err != nil { t.Fatalf("%v", err) } @@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - w, _ = wh.watch("/foo", false, 2) + w, _ = wh.newWatcher("/foo", false, 2) c = w.EventChan e = newEvent(Create, "/foo/bar", 2, 2) @@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - w, _ = wh.watch("/fo", true, 1) + w, _ = wh.newWatcher("/fo", true, 1) c = w.EventChan select { From d66dc3c1c70b872b41339e2405d6c6a7dbf0ccd4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Dec 2013 15:49:05 +0800 Subject: [PATCH 3/5] refactor(watcher_hub.go) refactor notifyWatchers() --- store/watcher_hub.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 64b3bdcf8..ebc7efec2 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -113,31 +113,26 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { if ok { curr := l.Front() - for { - if curr == nil { // we have reached the end of the list - if l.Len() == 0 { - // if we have notified all watcher in the list - // we can delete the list - delete(wh.watchers, path) - } - break - } - + for curr != nil { next := curr.Next() // save reference to the next one in the list w, _ := curr.Value.(*Watcher) if w.notify(e, e.Node.Key == path, deleted) { - // if we successfully notify a watcher // we need to remove the watcher from the list // and decrease the counter l.Remove(curr) atomic.AddInt64(&wh.count, -1) - } - curr = next // update current to the next + curr = next // update current to the next element in the list + } + + if l.Len() == 0 { + // if we have notified all watcher in the list + // we can delete the list + delete(wh.watchers, path) } } } From bbbf8fd57413adaced345108afa91b484df6ef3d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Dec 2013 15:51:16 +0800 Subject: [PATCH 4/5] fix(watcher_hub.go) decrease count when remove a watcher --- store/watcher_hub.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/watcher_hub.go b/store/watcher_hub.go index ebc7efec2..f8650eb0f 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -76,6 +76,7 @@ func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Wat wh.mutex.Lock() defer wh.mutex.Unlock() l.Remove(elem) + atomic.AddInt64(&wh.count, -1) if l.Len() == 0 { delete(wh.watchers, key) } From fa3b4a794103c323a490c3b1e983144c5b7eec2d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 9 Jan 2014 13:29:04 +0800 Subject: [PATCH 5/5] refactor(watcher) change newWatcher to Watch --- server/v1/watch_key_handler.go | 2 +- server/v2/get_handler.go | 2 +- store/store.go | 8 ++++---- store/store_test.go | 20 ++++++++++---------- store/watcher.go | 13 ++++++++++++- store/watcher_hub.go | 6 +++--- store/watcher_test.go | 6 +++--- 7 files changed, 34 insertions(+), 23 deletions(-) diff --git a/server/v1/watch_key_handler.go b/server/v1/watch_key_handler.go index a7e6b798a..fe5b768d1 100644 --- a/server/v1/watch_key_handler.go +++ b/server/v1/watch_key_handler.go @@ -25,7 +25,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } // Start the watcher on the store. - watcher, err := s.Store().NewWatcher(key, false, sinceIndex) + watcher, err := s.Store().Watch(key, false, sinceIndex) if err != nil { return etcdErr.NewError(500, key, s.Store().Index()) } diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 031ff449e..05e5d9a31 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -55,7 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { } // Start the watcher on the store. - watcher, err := s.Store().NewWatcher(key, recursive, sinceIndex) + watcher, err := s.Store().Watch(key, recursive, sinceIndex) if err != nil { return etcdErr.NewError(500, key, s.Store().Index()) } diff --git a/store/store.go b/store/store.go index 6d78e18b4..10e381ee0 100644 --- a/store/store.go +++ b/store/store.go @@ -53,7 +53,7 @@ type Store interface { Delete(nodePath string, recursive, dir bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) - NewWatcher(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error) + Watch(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error) Save() ([]byte, error) Recovery(state []byte) error @@ -341,7 +341,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui return e, nil } -func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watcher, error) { +func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, error) { key = path.Clean(path.Join("/", key)) nextIndex := s.CurrentIndex + 1 @@ -352,10 +352,10 @@ func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watc var err *etcdErr.Error if sinceIndex == 0 { - w, err = s.WatcherHub.newWatcher(key, recursive, nextIndex) + w, err = s.WatcherHub.watch(key, recursive, nextIndex) } else { - w, err = s.WatcherHub.newWatcher(key, recursive, sinceIndex) + w, err = s.WatcherHub.watch(key, recursive, sinceIndex) } if err != nil { diff --git a/store/store_test.go b/store/store_test.go index 4d71c91b9..91dc33506 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -446,7 +446,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { // Ensure that the store can watch for key creation. func TestStoreWatchCreate(t *testing.T) { s := newStore() - w, _ := s.NewWatcher("/foo", false, 0) + w, _ := s.Watch("/foo", false, 0) c := w.EventChan s.Create("/foo", false, "bar", false, Permanent) e := nbselect(c) @@ -459,7 +459,7 @@ func TestStoreWatchCreate(t *testing.T) { // Ensure that the store can watch for recursive key creation. func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() - w, _ := s.NewWatcher("/foo", true, 0) + w, _ := s.Watch("/foo", true, 0) s.Create("/foo/bar", false, "baz", false, Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "create", "") @@ -470,7 +470,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.NewWatcher("/foo", false, 0) + w, _ := s.Watch("/foo", false, 0) s.Update("/foo", "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") @@ -481,7 +481,7 @@ func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.NewWatcher("/foo", true, 0) + w, _ := s.Watch("/foo", true, 0) s.Update("/foo/bar", "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") @@ -492,7 +492,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchDelete(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.NewWatcher("/foo", false, 0) + w, _ := s.Watch("/foo", false, 0) s.Delete("/foo", false, false) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") @@ -503,7 +503,7 @@ func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.NewWatcher("/foo", true, 0) + w, _ := s.Watch("/foo", true, 0) s.Delete("/foo/bar", false, false) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") @@ -514,7 +514,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.NewWatcher("/foo", false, 0) + w, _ := s.Watch("/foo", false, 0) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") @@ -525,7 +525,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.NewWatcher("/foo", true, 0) + w, _ := s.Watch("/foo", true, 0) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") @@ -545,7 +545,7 @@ func TestStoreWatchExpire(t *testing.T) { s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) - w, _ := s.NewWatcher("/", true, 0) + w, _ := s.Watch("/", true, 0) c := w.EventChan e := nbselect(c) assert.Nil(t, e, "") @@ -553,7 +553,7 @@ func TestStoreWatchExpire(t *testing.T) { e = nbselect(c) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foo", "") - w, _ = s.NewWatcher("/", true, 4) + w, _ = s.Watch("/", true, 4) e = nbselect(w.EventChan) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foofoo", "") diff --git a/store/watcher.go b/store/watcher.go index 06060c03b..6576d8baf 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -20,7 +20,7 @@ type Watcher struct { EventChan chan *Event recursive bool sinceIndex uint64 - Remove func() + remove func() } // notify function notifies the watcher. If the watcher interests in the given path, @@ -47,3 +47,14 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { } return false } + +// Remove removes the watcher from watcherHub +func (w *Watcher) Remove() { + if w.remove != nil { + w.remove() + } else { + // We attached a remove function to watcher + // Other pkg cannot change it, so this should not happen + panic("missing Watcher remove function") + } +} diff --git a/store/watcher_hub.go b/store/watcher_hub.go index f8650eb0f..9b7aaba3b 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -34,11 +34,11 @@ func newWatchHub(capacity int) *watcherHub { } } -// newWatcher function returns a watcher. +// Watch function returns a watcher. // If recursive is true, the first change after index under key will be sent to the event channel of the watcher. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { +func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { @@ -72,7 +72,7 @@ func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Wat wh.watchers[key] = l } - w.Remove = func() { + w.remove = func() { wh.mutex.Lock() defer wh.mutex.Unlock() l.Remove(elem) diff --git a/store/watcher_test.go b/store/watcher_test.go index aa485645a..2600fa161 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,7 +23,7 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - w, err := wh.newWatcher("/foo", true, 1) + w, err := wh.watch("/foo", true, 1) if err != nil { t.Fatalf("%v", err) } @@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - w, _ = wh.newWatcher("/foo", false, 2) + w, _ = wh.watch("/foo", false, 2) c = w.EventChan e = newEvent(Create, "/foo/bar", 2, 2) @@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - w, _ = wh.newWatcher("/fo", true, 1) + w, _ = wh.watch("/fo", true, 1) c = w.EventChan select {