diff --git a/store/event.go b/store/event.go index f4085358d..53d4568bd 100644 --- a/store/event.go +++ b/store/event.go @@ -15,6 +15,9 @@ const ( Update = "update" Delete = "delete" TestAndSet = "testAndSet" + Expire = "expire" + UndefIndex = 0 + UndefTerm = 0 ) type Event struct { @@ -73,7 +76,6 @@ func (eq *eventQueue) back() int { } func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity eq.Events[index] = e @@ -89,6 +91,9 @@ func (eq *eventQueue) insert(e *Event) { type EventHistory struct { Queue eventQueue StartIndex uint64 + LastIndex uint64 + LastTerm uint64 + DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue rwl sync.RWMutex } @@ -102,13 +107,31 @@ func newEventHistory(capacity int) *EventHistory { } // addEvent function adds event into the eventHistory -func (eh *EventHistory) addEvent(e *Event) { +func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() + var duped uint64 + + if e.Index == UndefIndex { + e.Index = eh.LastIndex + duped = 1 + } + + if e.Term == UndefTerm { + e.Term = eh.LastTerm + duped = 1 + } + eh.Queue.insert(e) eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + + eh.LastIndex = e.Index + eh.LastTerm = e.Term + eh.DupCnt += duped + + return e } // scan function is enumerating events from the index in history and @@ -117,7 +140,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - start := index - eh.StartIndex + start := index - eh.StartIndex + eh.DupCnt // the index should locate after the event history's StartIndex // and before its size @@ -126,8 +149,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { // TODO: Add error type return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, - fmt.Sprintf("prefix:%v index:%v", prefix, index), - ) + fmt.Sprintf("prefix:%v index:%v", prefix, index)) } if start >= uint64(eh.Queue.Size) { @@ -149,13 +171,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { return nil, nil } } - } // clone will be protected by a stop-world lock // do not need to obtain internal lock func (eh *EventHistory) clone() *EventHistory { - clonedQueue := eventQueue{ Capacity: eh.Queue.Capacity, Events: make([]*Event, eh.Queue.Capacity), diff --git a/store/event_test.go b/store/event_test.go index 0d19dd52a..c02a4d70e 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) { // Add for i := 0; i < 200; i++ { - e := newEvent(Create, "/foo", uint64(i), 0) + e := newEvent(Create, "/foo", uint64(i), 1) eh.addEvent(e) } @@ -28,20 +28,18 @@ func TestEventQueue(t *testing.T) { } j++ i = (i + 1) % eh.Queue.Capacity - } - } func TestScanHistory(t *testing.T) { eh := newEventHistory(100) // Add - eh.addEvent(newEvent(Create, "/foo", 1, 0)) - eh.addEvent(newEvent(Create, "/foo/bar", 2, 0)) - eh.addEvent(newEvent(Create, "/foo/foo", 3, 0)) - eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0)) - eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0)) + eh.addEvent(newEvent(Create, "/foo", 1, 1)) + eh.addEvent(newEvent(Create, "/foo/bar", 2, 1)) + eh.addEvent(newEvent(Create, "/foo/foo", 3, 1)) + eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1)) + eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1)) e, err := eh.scan("/foo", 1) if err != nil || e.Index != 1 { @@ -65,5 +63,4 @@ func TestScanHistory(t *testing.T) { if e != nil { t.Fatalf("bad index shoud reuturn nil") } - } diff --git a/store/node.go b/store/node.go index 8502f70d1..f5545fa60 100644 --- a/store/node.go +++ b/store/node.go @@ -87,6 +87,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error { n.stopExpire <- true n.status = removed + } return nil @@ -185,7 +186,6 @@ func (n *Node) GetFile(name string) (*Node, error) { } return nil, nil - } // Add function adds a node to the receiver node. @@ -214,7 +214,6 @@ func (n *Node) Add(child *Node) error { n.Children[name] = child return nil - } // Clone function clone the node recursively and return the new node. @@ -236,24 +235,36 @@ func (n *Node) Clone() *Node { return clone } -func (n *Node) recoverAndclean() { +func (n *Node) recoverAndclean(s *Store) { if n.IsDir() { for _, child := range n.Children { child.Parent = n - child.recoverAndclean() + child.recoverAndclean(s) } } n.stopExpire = make(chan bool, 1) - n.Expire() + n.Expire(s) } -func (n *Node) Expire() { +// Expire function will test if the node is expired. +// if the node is already expired, delete the node and return. +// if the node is permemant (this shouldn't happen), return at once. +// else wait for a period time, then remove the node. and notify the watchhub. +func (n *Node) Expire(s *Store) { expired, duration := n.IsExpired() if expired { // has been expired + + // since the parent function of Expire() runs serially, + // there is no need for lock here + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) + n.Remove(true, nil) + s.Stats.Inc(ExpireCount) + return } @@ -265,13 +276,24 @@ func (n *Node) Expire() { select { // if timeout, delete the node case <-time.After(duration): + + // Lock the worldLock to avoid race on s.WatchHub, + // and the race with other slibling nodes on their common parent. + s.worldLock.Lock() + + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) + n.Remove(true, nil) + s.Stats.Inc(ExpireCount) + + s.worldLock.Unlock() + return // if stopped, return case <-n.stopExpire: return - } }() } @@ -285,7 +307,6 @@ func (n *Node) IsHidden() bool { _, name := path.Split(n.Path) return name[0] == '_' - } func (n *Node) IsPermanent() bool { @@ -346,6 +367,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { if sorted { sort.Sort(pair) } + return pair } @@ -354,3 +376,18 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { Value: n.Value, } } + +func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { + if !n.IsPermanent() { + expired, _ := n.IsExpired() + + if !expired { + n.stopExpire <- true // suspend it to modify the expiration + } + } + + if expireTime.Sub(Permanent) != 0 { + n.ExpireTime = expireTime + n.Expire(s) + } +} diff --git a/store/stats.go b/store/stats.go index b276d5279..e2053ed42 100644 --- a/store/stats.go +++ b/store/stats.go @@ -16,6 +16,7 @@ const ( TestAndSetFail = 107 GetSuccess = 110 GetFail = 111 + ExpireCount = 112 ) type Stats struct { @@ -39,6 +40,7 @@ type Stats struct { // Number of testAndSet requests TestAndSetSuccess uint64 `json:"testAndSetSuccess"` TestAndSetFail uint64 `json:"testAndSetFail"` + ExpireCount uint64 `json:"expireCount"` Watchers uint64 `json:"watchers"` } @@ -51,7 +53,7 @@ func newStats() *Stats { func (s *Stats) clone() *Stats { return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, - s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers} + s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount} } // Status() return the statistics info of etcd storage its recent start @@ -93,5 +95,7 @@ func (s *Stats) Inc(field int) { atomic.AddUint64(&s.TestAndSetSuccess, 1) case TestAndSetFail: atomic.AddUint64(&s.TestAndSetFail, 1) + case ExpireCount: + atomic.AddUint64(&s.ExpireCount, 1) } } diff --git a/store/stats_test.go b/store/stats_test.go index ff67f328a..207df825f 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1) + _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { SetFail++ } else { @@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) { } } + time.Sleep(time.Second * 3) + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -35,7 +37,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1) + _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { UpdateFail++ } else { @@ -43,6 +45,8 @@ func TestBasicStats(t *testing.T) { } } + time.Sleep(time.Second * 3) + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -136,4 +140,26 @@ func TestBasicStats(t *testing.T) { t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) } + s = New() + SetSuccess = 0 + SetFail = 0 + + for _, k := range keys { + i++ + _, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1) + if err != nil { + SetFail++ + } else { + SetSuccess++ + } + } + + time.Sleep(6 * time.Second) + + ExpireCount := SetSuccess + + if ExpireCount != s.Stats.ExpireCount { + t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount) + } + } diff --git a/store/store.go b/store/store.go index 6265d291c..f77189172 100644 --- a/store/store.go +++ b/store/store.go @@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // Node with TTL if expireTime.Sub(Permanent) != 0 { - n.Expire() + n.Expire(s) e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } @@ -171,15 +171,16 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde if err != nil { // if the node does not exist, return error s.Stats.Inc(UpdateFail) + return nil, err } e := newEvent(Update, nodePath, s.Index, s.Term) if n.IsDir() { // if the node is a directory, we can only update ttl - if len(value) != 0 { s.Stats.Inc(UpdateFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } @@ -194,19 +195,14 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde } // update ttl - if !n.IsPermanent() { - n.stopExpire <- true - } - - if expireTime.Sub(Permanent) != 0 { - n.ExpireTime = expireTime - n.Expire() - e.Expiration = &n.ExpireTime - e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 - } + n.UpdateTTL(expireTime, s) + e.Expiration = &n.ExpireTime + e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 s.WatcherHub.notify(e) + s.Stats.Inc(UpdateSuccess) + return e, nil } @@ -216,31 +212,33 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, s.worldLock.RLock() defer s.worldLock.RUnlock() - f, err := s.internalGet(nodePath, index, term) + n, err := s.internalGet(nodePath, index, term) if err != nil { s.Stats.Inc(TestAndSetFail) return nil, err } - if f.IsDir() { // can only test and set file + if n.IsDir() { // can only test and set file s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } - if f.Value == prevValue || f.ModifiedIndex == prevIndex { + if n.Value == prevValue || n.ModifiedIndex == prevIndex { // if test succeed, write the value e := newEvent(TestAndSet, nodePath, index, term) - e.PrevValue = f.Value + e.PrevValue = n.Value e.Value = value - f.Write(value, index, term) + n.Write(value, index, term) + + n.UpdateTTL(expireTime, s) s.WatcherHub.notify(e) s.Stats.Inc(TestAndSetSuccess) return e, nil } - cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex) + cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause) } @@ -404,7 +402,7 @@ func (s *Store) Recovery(state []byte) error { return err } - s.Root.recoverAndclean() + s.Root.recoverAndclean(s) return nil } diff --git a/store/store_test.go b/store/store_test.go index 253063a2a..87010b1ee 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -52,7 +52,6 @@ func TestCreateAndGet(t *testing.T) { if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") } - } func TestUpdateFile(t *testing.T) { @@ -81,7 +80,6 @@ func TestUpdateFile(t *testing.T) { } // create a directory, update its ttl, to see if it will be deleted - _, err = s.Create("/foo/foo", "", Permanent, 3, 1) if err != nil { @@ -237,7 +235,6 @@ func TestRemove(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } func TestExpire(t *testing.T) { @@ -280,7 +277,6 @@ func TestExpire(t *testing.T) { if err != nil { t.Fatalf("cannot delete the node before expiration", err.Error()) } - } func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? @@ -314,70 +310,95 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? if e.PrevValue != "car" || e.Value != "bar" { t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar") } - } func TestWatch(t *testing.T) { s := New() // watch at a deeper path - c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) s.Create("/foo/foo/foo", "bar", Permanent, 1, 1) e := nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Create node fails") + if e.Key != "/foo/foo/foo" || e.Action != Create { + t.Fatal("watch for Create node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) s.Update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Update node fails") + if e.Key != "/foo/foo/foo" || e.Action != Update { + t.Fatal("watch for Update node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { + if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet node fails") } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1) s.Delete("/foo", true, 4, 1) //recursively delete e = nonblockingRetrive(c) - if e.Key != "/foo" { - t.Fatal("watch for Delete node fails") + if e.Key != "/foo" || e.Action != Delete { + t.Fatal("watch for Delete node fails ", e) } // watch at a prefix - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 4, 1) s.Create("/foo/foo/boo", "bar", Permanent, 5, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Create { t.Fatal("watch for Create subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 5, 1) s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 6, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 7, 1) s.Delete("/foo/foo/boo", false, 8, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Delete { t.Fatal("watch for Delete subdirectory fails") } + // watch expire + s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1) + c, _ = s.Watch("/foo", true, 0, 9, 1) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 9 { + t.Fatal("watch for Expiration of Create() subdirectory fails ", e) + } + + s.Create("/foo/foo/boo", "foo", Permanent, 10, 1) + s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) + c, _ = s.Watch("/foo", true, 0, 11, 1) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 11 { + t.Fatal("watch for Expiration of Update() subdirectory fails ", e) + } + + s.Create("/foo/foo/boo", "foo", Permanent, 12, 1) + s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) + c, _ = s.Watch("/foo", true, 0, 13, 1) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { + t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) + } } func TestSort(t *testing.T) { @@ -442,7 +463,7 @@ func TestSaveAndRecover(t *testing.T) { b, err := s.Save() cloneFs := New() - time.Sleep(time.Second) + time.Sleep(2 * time.Second) cloneFs.Recovery(b) @@ -453,11 +474,15 @@ func TestSaveAndRecover(t *testing.T) { } } + // lock to avoid racing with Expire() + s.worldLock.RLock() + defer s.worldLock.RUnlock() + if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } - for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { + for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { if s.WatcherHub.EventHistory.Queue.Events[i].Key != cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { t.Fatal("Error recovered event history") @@ -469,7 +494,6 @@ func TestSaveAndRecover(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } // GenKeys randomly generate num of keys with max depth @@ -485,6 +509,7 @@ func GenKeys(num int, depth int) []string { keys[i] += "/" + strconv.Itoa(rand.Int()) } } + return keys } @@ -504,11 +529,9 @@ func createAndGet(s *Store, path string, t *testing.T) { if e.Value != "bar" { t.Fatalf("expect value of %s is bar [%s]", path, e.Value) } - } func recursiveTestSort(k KeyValuePair, t *testing.T) { - for i, v := range k.KVPairs[:len(k.KVPairs)-1] { if v.Key >= k.KVPairs[i+1].Key { t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key) @@ -517,7 +540,6 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) { if v.Dir { recursiveTestSort(v, t) } - } if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir { diff --git a/store/watcher.go b/store/watcher.go index 3b3e43478..5d96e45cd 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -47,7 +47,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan w := &watcher{ eventChan: eventChan, recursive: recursive, - sinceIndex: index, + sinceIndex: index - 1, // to catch Expire() } l, ok := wh.watchers[prefix] @@ -74,21 +74,19 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { notifiedAll := true for { - if curr == nil { // we have reached the end of the list - if notifiedAll { // if we have notified all watcher in the list // we can delete the list delete(wh.watchers, path) } + break } next := curr.Next() // save the next w, _ := curr.Value.(*watcher) - if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex { w.eventChan <- e l.Remove(curr) @@ -98,12 +96,13 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } curr = next // go to the next one - } } } func (wh *watcherHub) notify(e *Event) { + e = wh.EventHistory.addEvent(e) + segments := strings.Split(e.Key, "/") currPath := "/" @@ -113,8 +112,6 @@ func (wh *watcherHub) notify(e *Event) { currPath = path.Join(currPath, segment) wh.notifyWithPath(e, currPath, false) } - - wh.EventHistory.addEvent(e) } func (wh *watcherHub) clone() *watcherHub { diff --git a/store/watcher_test.go b/store/watcher_test.go index 90c23c59e..e437422ad 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -7,8 +7,7 @@ import ( func TestWatcher(t *testing.T) { s := New() wh := s.WatcherHub - c, err := wh.watch("/foo", true, 0) - + c, err := wh.watch("/foo", true, 1) if err != nil { t.Fatal("%v", err) } @@ -20,7 +19,7 @@ func TestWatcher(t *testing.T) { // do nothing } - e := newEvent(Create, "/foo/bar", 1, 0) + e := newEvent(Create, "/foo/bar", 1, 1) wh.notify(e) @@ -30,20 +29,20 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - c, _ = wh.watch("/foo", false, 0) + c, _ = wh.watch("/foo", false, 2) - e = newEvent(Create, "/foo/bar", 1, 0) + e = newEvent(Create, "/foo/bar", 2, 1) wh.notify(e) select { - case <-c: - t.Fatal("should not receive from channel if not recursive") + case re = <-c: + t.Fatal("should not receive from channel if not recursive ", re) default: // do nothing } - e = newEvent(Create, "/foo", 1, 0) + e = newEvent(Create, "/foo", 3, 1) wh.notify(e)