From c307b6abcad43b022a368659d901a3669c5d9601 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 6 Nov 2013 21:19:37 -0800 Subject: [PATCH] fix watcher_hub --- store/event_history.go | 30 ++++++++++++------- store/event_test.go | 12 ++++---- store/store.go | 5 ++-- store/store_test.go | 11 +++++-- store/watcher.go | 1 - store/watcher_hub.go | 65 ++++++++++++++++++++++++++---------------- 6 files changed, 77 insertions(+), 47 deletions(-) diff --git a/store/event_history.go b/store/event_history.go index 3ddd38206..ce21ec1fc 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -31,19 +31,14 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - var duped uint64 - - if e.Index == UndefIndex { - e.Index = eh.LastIndex - e.Term = eh.LastTerm - duped = 1 + if e.Index == eh.LastIndex { + eh.DupCnt += 1 } eh.Queue.insert(e) eh.LastIndex = e.Index eh.LastTerm = e.Term - eh.DupCnt += duped eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index @@ -52,7 +47,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event { // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix -func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { +func (eh *EventHistory) scan(prefix string, index uint64) ([]*Event, *etcdErr.Error) { eh.rwl.RLock() defer eh.rwl.RUnlock() @@ -73,16 +68,29 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) + events := make([]*Event, 0) + var eventIndex uint64 + for { e := eh.Queue.Events[i] + + if eventIndex != 0 && eventIndex != e.Index { + return events, nil + } + if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one - return e, nil + eventIndex = e.Index + events = append(events, e) } i = (i + 1) % eh.Queue.Capacity - if i == eh.Queue.back() { // find nothing, return and watch from current index - return nil, nil + if i == eh.Queue.back() { + if eventIndex == 0 { // find nothing, return and watch from current index + return nil, nil + } + + return events, nil } } } diff --git a/store/event_test.go b/store/event_test.go index c02a4d70e..aedf7f7da 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -42,20 +42,20 @@ func TestScanHistory(t *testing.T) { eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1)) e, err := eh.scan("/foo", 1) - if err != nil || e.Index != 1 { - t.Fatalf("scan error [/foo] [1] %v", e.Index) + if err != nil || e[0].Index != 1 { + t.Fatalf("scan error [/foo] [1] %v", e[0].Index) } e, err = eh.scan("/foo/bar", 1) - if err != nil || e.Index != 2 { - t.Fatalf("scan error [/foo/bar] [2] %v", e.Index) + if err != nil || e[0].Index != 2 { + t.Fatalf("scan error [/foo/bar] [2] %v", e[0].Index) } e, err = eh.scan("/foo/bar", 3) - if err != nil || e.Index != 4 { - t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index) + if err != nil || e[0].Index != 4 { + t.Fatalf("scan error [/foo/bar/bar] [4] %v", e[0].Index) } e, err = eh.scan("/foo/bar", 6) diff --git a/store/store.go b/store/store.go index 14d480b9c..6190e354d 100644 --- a/store/store.go +++ b/store/store.go @@ -435,10 +435,12 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, } // deleteExpiredKyes will delete all -func (s *store) deleteExpiredKeys(cutoff time.Time) { +func (s *store) deleteExpiredKeys(cutoff time.Time, index uint64, term uint64) { s.worldLock.Lock() defer s.worldLock.Unlock() + s.Index, s.Term = index, term + for { node := s.ttlKeyHeap.top() if node == nil || node.ExpireTime.After(cutoff) { @@ -497,7 +499,6 @@ func (s *store) Save() ([]byte, error) { b, err := json.Marshal(clonedStore) if err != nil { - fmt.Println(err) return nil, err } diff --git a/store/store_test.go b/store/store_test.go index 013656c76..1fc242d91 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -344,13 +344,18 @@ func TestStoreWatchExpire(t *testing.T) { s := newStore() go mockSyncService(s.deleteExpiredKeys) s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond), 2, 1) - c, _ := s.Watch("/foo", false, 0, 0, 1) + s.Create("/foofoo", "barbarbar", false, time.Now().Add(500*time.Millisecond), 2, 1) + + c, _ := s.Watch("/", true, 0, 0, 1) e := nbselect(c) assert.Nil(t, e, "") time.Sleep(600 * time.Millisecond) e = nbselect(c) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Key, "/foo", "") + e = nbselect(c) + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Key, "/foofoo", "") } // Ensure that the store can recover from a previously saved state. @@ -409,9 +414,9 @@ func nbselect(c <-chan *Event) *Event { } } -func mockSyncService(f func(now time.Time)) { +func mockSyncService(f func(now time.Time, index uint64, term uint64)) { ticker := time.Tick(time.Millisecond * 500) for now := range ticker { - f(now) + f(now, 2, 1) } } diff --git a/store/watcher.go b/store/watcher.go index b9cb5499c..2015d0072 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -24,7 +24,6 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { // at the file we need to delete. // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // should get notified even if "/foo" is not the path it is watching. - if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex { w.eventChan <- e return true diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 9e0285b47..33eda248e 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -19,7 +19,8 @@ type watcherHub struct { watchers map[string]*list.List count int64 // current number of watchers. EventHistory *EventHistory - pendingWatchers *list.List + pendingWatchers map[*list.Element]*list.List + pendingList map[*list.List]string } // newWatchHub creates a watchHub. The capacity determines how many events we will @@ -30,7 +31,8 @@ func newWatchHub(capacity int) *watcherHub { return &watcherHub{ watchers: make(map[string]*list.List), EventHistory: newEventHistory(capacity), - pendingWatchers: list.New(), + pendingWatchers: make(map[*list.Element]*list.List), + pendingList: make(map[*list.List]string), } } @@ -39,23 +41,30 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is false, the first change after index at prefix will be sent to the event channel. // If index is zero, watch will start from the current index + 1. func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { - eventChan := make(chan *Event, 1) - - e, err := wh.EventHistory.scan(prefix, index) + events, err := wh.EventHistory.scan(prefix, index) if err != nil { return nil, err } - if e != nil { - eventChan <- e + eventChan := make(chan *Event, len(events)+5) // use a buffered channel + + if events != nil { + for _, e := range events { + eventChan <- e + } + + if len(events) > 1 { + eventChan <- nil + } + return eventChan, nil } w := &watcher{ eventChan: eventChan, recursive: recursive, - sinceIndex: index - 1, // to catch Expire() + sinceIndex: index, } l, ok := wh.watchers[prefix] @@ -95,19 +104,16 @@ func (wh *watcherHub) notify(e *Event) { func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { l, ok := wh.watchers[path] - if ok { curr := l.Front() - notifiedAll := true for { if curr == nil { // we have reached the end of the list - if notifiedAll { + if l.Len() == 0 { // if we have notified all watcher in the list // we can delete the list delete(wh.watchers, path) } - break } @@ -116,20 +122,18 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { w, _ := curr.Value.(*watcher) if w.notify(e, e.Key == path, deleted) { - // if we successfully notify a watcher - // we need to remove the watcher from the list - // and decrease the counter - l.Remove(curr) - atomic.AddInt64(&wh.count, -1) if e.Action == Expire { - wh.pendingWatchers.PushBack(w) + wh.pendingWatchers[curr] = l + wh.pendingList[l] = path + } else { + // if we successfully notify a watcher + // we need to remove the watcher from the list + // and decrease the counter + l.Remove(curr) + atomic.AddInt64(&wh.count, -1) } - } else { - // once there is a watcher in the list is not interested - // in the event, we should keep the list in the map - notifiedAll = false } curr = next // update current to the next @@ -138,11 +142,24 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { } func (wh *watcherHub) clearPendingWatchers() { - for e := wh.pendingWatchers.Front(); e != nil; e = e.Next() { + if len(wh.pendingWatchers) == 0 { // avoid making new maps + return + } + + for e, l := range wh.pendingWatchers { + l.Remove(e) + + if l.Len() == 0 { + path := wh.pendingList[l] + delete(wh.watchers, path) + } + w, _ := e.Value.(*watcher) w.eventChan <- nil } - wh.pendingWatchers = list.New() + + wh.pendingWatchers = make(map[*list.Element]*list.List) + wh.pendingList = make(map[*list.List]string) } // clone function clones the watcherHub and return the cloned one.