From 35724319c9c12a68cbb163e8496185bb6306428d Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 01:06:18 -0400 Subject: [PATCH 1/7] add watcher for expiration, add expiration for TestAndSet, add related test case --- store/event.go | 19 +++++++++++++++++-- store/node.go | 22 ++++++++++++++++++---- store/store.go | 31 ++++++++++++++----------------- store/store_test.go | 41 +++++++++++++++++++++++++++++++++++++++-- store/watcher.go | 15 +++++++++++++++ 5 files changed, 103 insertions(+), 25 deletions(-) diff --git a/store/event.go b/store/event.go index f4085358d..29cec3f12 100644 --- a/store/event.go +++ b/store/event.go @@ -15,6 +15,7 @@ const ( Update = "update" Delete = "delete" TestAndSet = "testAndSet" + Expire = "expire" ) type Event struct { @@ -111,6 +112,21 @@ func (eh *EventHistory) addEvent(e *Event) { eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index } +// addEvent with the last event's index and term +func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { + eh.rwl.Lock() + defer eh.rwl.Unlock() + + LastEvent := eh.Queue.Events[eh.Queue.back()] + e = newEvent(action, key, LastEvent.Index, LastEvent.Term) + + eh.Queue.insert(e) + + eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + + return e +} + // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { @@ -126,8 +142,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { // TODO: Add error type return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, - fmt.Sprintf("prefix:%v index:%v", prefix, index), - ) + fmt.Sprintf("prefix:%v index:%v", prefix, index)) } if start >= uint64(eh.Queue.Size) { diff --git a/store/node.go b/store/node.go index 8502f70d1..051b622a2 100644 --- a/store/node.go +++ b/store/node.go @@ -236,20 +236,20 @@ func (n *Node) Clone() *Node { return clone } -func (n *Node) recoverAndclean() { +func (n *Node) recoverAndclean(WatcherHub *watcherHub) { if n.IsDir() { for _, child := range n.Children { child.Parent = n - child.recoverAndclean() + child.recoverAndclean(WatcherHub) } } n.stopExpire = make(chan bool, 1) - n.Expire() + n.Expire(WatcherHub) } -func (n *Node) Expire() { +func (n *Node) Expire(WatcherHub *watcherHub) { expired, duration := n.IsExpired() if expired { // has been expired @@ -266,6 +266,9 @@ func (n *Node) Expire() { // if timeout, delete the node case <-time.After(duration): n.Remove(true, nil) + + WatcherHub.notifyWithoutIndex(Expire, n.Path) + return // if stopped, return @@ -354,3 +357,14 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { Value: n.Value, } } + +func (n *Node) UpdateTTL(expireTime time.Time, WatcherHub *watcherHub) { + if !n.IsPermanent() { + n.stopExpire <- true // suspend it to modify the expiration + } + + if expireTime.Sub(Permanent) != 0 { + n.ExpireTime = expireTime + n.Expire(WatcherHub) + } +} diff --git a/store/store.go b/store/store.go index 6265d291c..497ffbd88 100644 --- a/store/store.go +++ b/store/store.go @@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // Node with TTL if expireTime.Sub(Permanent) != 0 { - n.Expire() + n.Expire(s.WatcherHub) e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } @@ -194,19 +194,14 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde } // update ttl - if !n.IsPermanent() { - n.stopExpire <- true - } + n.UpdateTTL(expireTime, s.WatcherHub) - if expireTime.Sub(Permanent) != 0 { - n.ExpireTime = expireTime - n.Expire() - e.Expiration = &n.ExpireTime - e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 - } + e.Expiration = &n.ExpireTime + e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 s.WatcherHub.notify(e) s.Stats.Inc(UpdateSuccess) + return e, nil } @@ -216,31 +211,33 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, s.worldLock.RLock() defer s.worldLock.RUnlock() - f, err := s.internalGet(nodePath, index, term) + n, err := s.internalGet(nodePath, index, term) if err != nil { s.Stats.Inc(TestAndSetFail) return nil, err } - if f.IsDir() { // can only test and set file + if n.IsDir() { // can only test and set file s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } - if f.Value == prevValue || f.ModifiedIndex == prevIndex { + if n.Value == prevValue || n.ModifiedIndex == prevIndex { // if test succeed, write the value e := newEvent(TestAndSet, nodePath, index, term) - e.PrevValue = f.Value + e.PrevValue = n.Value e.Value = value - f.Write(value, index, term) + n.Write(value, index, term) + + n.UpdateTTL(expireTime, s.WatcherHub) s.WatcherHub.notify(e) s.Stats.Inc(TestAndSetSuccess) return e, nil } - cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex) + cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause) } @@ -404,7 +401,7 @@ func (s *Store) Recovery(state []byte) error { return err } - s.Root.recoverAndclean() + s.Root.recoverAndclean(s.WatcherHub) return nil } diff --git a/store/store_test.go b/store/store_test.go index 253063a2a..01a934d4a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -378,6 +378,33 @@ func TestWatch(t *testing.T) { t.Fatal("watch for Delete subdirectory fails") } + // watch expire + s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Index != 9 { + t.Fatal("watch for Expiration of Create() subdirectory fails ", e) + } + + s.Create("/foo/foo/boo", "foo", Permanent, 10, 1) + s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Index != 11 { + t.Fatal("watch for Expiration of Update() subdirectory fails ", e) + } + + s.Create("/foo/foo/boo", "foo", Permanent, 12, 1) + s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + time.Sleep(time.Second * 2) + e = nonblockingRetrive(c) + if e.Key != "/foo/foo/boo" || e.Index != 13 { + t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) + } + } func TestSort(t *testing.T) { @@ -442,7 +469,7 @@ func TestSaveAndRecover(t *testing.T) { b, err := s.Save() cloneFs := New() - time.Sleep(time.Second) + time.Sleep(2 * time.Second) cloneFs.Recovery(b) @@ -457,7 +484,17 @@ func TestSaveAndRecover(t *testing.T) { t.Fatal("Error recovered event history start index") } - for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { + //t.Log("watcherhub.size: ", s.WatcherHub.EventHistory.Queue.Size) + //for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { + // t.Log(s.WatcherHub.EventHistory.Queue.Events[i]) + //} + // + //t.Log("ClonedWatcherhub.size: ", cloneFs.WatcherHub.EventHistory.Queue.Size) + //for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { + // t.Log(cloneFs.WatcherHub.EventHistory.Queue.Events[i]) + //} + + for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { if s.WatcherHub.EventHistory.Queue.Events[i].Key != cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { t.Fatal("Error recovered event history") diff --git a/store/watcher.go b/store/watcher.go index 3b3e43478..b039cd9a9 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -117,6 +117,21 @@ func (wh *watcherHub) notify(e *Event) { wh.EventHistory.addEvent(e) } +// notify with last event's index and term +func (wh *watcherHub) notifyWithoutIndex(action, key string) { + e := wh.EventHistory.addEventWithouIndex(action, key) + + segments := strings.Split(e.Key, "/") + + currPath := "/" + + // walk through all the paths + for _, segment := range segments { + currPath = path.Join(currPath, segment) + wh.notifyWithPath(e, currPath, false) + } +} + func (wh *watcherHub) clone() *watcherHub { clonedHistory := wh.EventHistory.clone() From 3ae316ac380fc617d7cfd0a1063abdb9b99d3dac Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 02:39:40 -0400 Subject: [PATCH 2/7] add ExpireCount and some test case --- store/node.go | 15 ++++++++------- store/stats.go | 6 +++++- store/stats_test.go | 30 ++++++++++++++++++++++++++++-- store/store.go | 8 ++++---- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/store/node.go b/store/node.go index 051b622a2..f09c742c3 100644 --- a/store/node.go +++ b/store/node.go @@ -236,20 +236,20 @@ func (n *Node) Clone() *Node { return clone } -func (n *Node) recoverAndclean(WatcherHub *watcherHub) { +func (n *Node) recoverAndclean(s *Store) { if n.IsDir() { for _, child := range n.Children { child.Parent = n - child.recoverAndclean(WatcherHub) + child.recoverAndclean(s) } } n.stopExpire = make(chan bool, 1) - n.Expire(WatcherHub) + n.Expire(s) } -func (n *Node) Expire(WatcherHub *watcherHub) { +func (n *Node) Expire(s *Store) { expired, duration := n.IsExpired() if expired { // has been expired @@ -267,7 +267,8 @@ func (n *Node) Expire(WatcherHub *watcherHub) { case <-time.After(duration): n.Remove(true, nil) - WatcherHub.notifyWithoutIndex(Expire, n.Path) + s.Stats.Inc(ExpireCount) + s.WatcherHub.notifyWithoutIndex(Expire, n.Path) return @@ -358,13 +359,13 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { } } -func (n *Node) UpdateTTL(expireTime time.Time, WatcherHub *watcherHub) { +func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { if !n.IsPermanent() { n.stopExpire <- true // suspend it to modify the expiration } if expireTime.Sub(Permanent) != 0 { n.ExpireTime = expireTime - n.Expire(WatcherHub) + n.Expire(s) } } diff --git a/store/stats.go b/store/stats.go index b276d5279..e2053ed42 100644 --- a/store/stats.go +++ b/store/stats.go @@ -16,6 +16,7 @@ const ( TestAndSetFail = 107 GetSuccess = 110 GetFail = 111 + ExpireCount = 112 ) type Stats struct { @@ -39,6 +40,7 @@ type Stats struct { // Number of testAndSet requests TestAndSetSuccess uint64 `json:"testAndSetSuccess"` TestAndSetFail uint64 `json:"testAndSetFail"` + ExpireCount uint64 `json:"expireCount"` Watchers uint64 `json:"watchers"` } @@ -51,7 +53,7 @@ func newStats() *Stats { func (s *Stats) clone() *Stats { return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, - s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers} + s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount} } // Status() return the statistics info of etcd storage its recent start @@ -93,5 +95,7 @@ func (s *Stats) Inc(field int) { atomic.AddUint64(&s.TestAndSetSuccess, 1) case TestAndSetFail: atomic.AddUint64(&s.TestAndSetFail, 1) + case ExpireCount: + atomic.AddUint64(&s.ExpireCount, 1) } } diff --git a/store/stats_test.go b/store/stats_test.go index ff67f328a..207df825f 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1) + _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { SetFail++ } else { @@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) { } } + time.Sleep(time.Second * 3) + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -35,7 +37,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1) + _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { UpdateFail++ } else { @@ -43,6 +45,8 @@ func TestBasicStats(t *testing.T) { } } + time.Sleep(time.Second * 3) + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -136,4 +140,26 @@ func TestBasicStats(t *testing.T) { t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) } + s = New() + SetSuccess = 0 + SetFail = 0 + + for _, k := range keys { + i++ + _, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1) + if err != nil { + SetFail++ + } else { + SetSuccess++ + } + } + + time.Sleep(6 * time.Second) + + ExpireCount := SetSuccess + + if ExpireCount != s.Stats.ExpireCount { + t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount) + } + } diff --git a/store/store.go b/store/store.go index 497ffbd88..51f49f51f 100644 --- a/store/store.go +++ b/store/store.go @@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // Node with TTL if expireTime.Sub(Permanent) != 0 { - n.Expire(s.WatcherHub) + n.Expire(s) e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } @@ -194,7 +194,7 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde } // update ttl - n.UpdateTTL(expireTime, s.WatcherHub) + n.UpdateTTL(expireTime, s) e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 @@ -230,7 +230,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, e.Value = value n.Write(value, index, term) - n.UpdateTTL(expireTime, s.WatcherHub) + n.UpdateTTL(expireTime, s) s.WatcherHub.notify(e) s.Stats.Inc(TestAndSetSuccess) @@ -401,7 +401,7 @@ func (s *Store) Recovery(state []byte) error { return err } - s.Root.recoverAndclean(s.WatcherHub) + s.Root.recoverAndclean(s) return nil } From 0959448855580976939704cd6f72686cc7c3ebd0 Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 12:18:28 -0400 Subject: [PATCH 3/7] add LastIndex and LastTerm in EventHistory --- store/event.go | 25 ++++++++++++++++++++----- store/node.go | 5 ++++- store/watcher.go | 17 +---------------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/store/event.go b/store/event.go index 29cec3f12..4ca6f5e58 100644 --- a/store/event.go +++ b/store/event.go @@ -90,6 +90,8 @@ func (eq *eventQueue) insert(e *Event) { type EventHistory struct { Queue eventQueue StartIndex uint64 + LastIndex uint64 + LastTerm uint64 rwl sync.RWMutex } @@ -103,29 +105,42 @@ func newEventHistory(capacity int) *EventHistory { } // addEvent function adds event into the eventHistory -func (eh *EventHistory) addEvent(e *Event) { +func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() + if e.Index == 0 { + e.Index = eh.LastIndex + } + + if e.Term == 0 { + e.Term = eh.LastTerm + } + eh.Queue.insert(e) eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + + eh.LastIndex = e.Index + eh.LastTerm = e.Term + + return e } // addEvent with the last event's index and term -func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { +/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { eh.rwl.Lock() defer eh.rwl.Unlock() LastEvent := eh.Queue.Events[eh.Queue.back()] - e = newEvent(action, key, LastEvent.Index, LastEvent.Term) + e = newEvent(action, key, LastEvent.Index, LastEvent.Term); eh.Queue.insert(e) eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - return e -} + return e; +}*/ // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix diff --git a/store/node.go b/store/node.go index f09c742c3..214a33fa0 100644 --- a/store/node.go +++ b/store/node.go @@ -265,10 +265,13 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): + e := newEvent(Expire, n.Path, 0, 0) + n.Remove(true, nil) s.Stats.Inc(ExpireCount) - s.WatcherHub.notifyWithoutIndex(Expire, n.Path) + + s.WatcherHub.notify(e) return diff --git a/store/watcher.go b/store/watcher.go index b039cd9a9..d67c713df 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -104,22 +104,7 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } func (wh *watcherHub) notify(e *Event) { - segments := strings.Split(e.Key, "/") - - currPath := "/" - - // walk through all the paths - for _, segment := range segments { - currPath = path.Join(currPath, segment) - wh.notifyWithPath(e, currPath, false) - } - - wh.EventHistory.addEvent(e) -} - -// notify with last event's index and term -func (wh *watcherHub) notifyWithoutIndex(action, key string) { - e := wh.EventHistory.addEventWithouIndex(action, key) + e = wh.EventHistory.addEvent(e) segments := strings.Split(e.Key, "/") From b8ac1d082b7045aeffcfcba5825dbebc3e27a12a Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 22:10:40 -0400 Subject: [PATCH 4/7] fix race between Expire() and others, fix UpdateTTL(), modified watcher to catch Expire() --- store/event.go | 29 ++++++++------------ store/event_test.go | 12 ++++----- store/node.go | 16 +++++++---- store/stats_test.go | 12 +++++++++ store/store.go | 5 ++-- store/store_test.go | 63 +++++++++++++++++++------------------------ store/watcher.go | 2 +- store/watcher_test.go | 15 +++++------ 8 files changed, 78 insertions(+), 76 deletions(-) diff --git a/store/event.go b/store/event.go index 4ca6f5e58..e95d50f1a 100644 --- a/store/event.go +++ b/store/event.go @@ -16,6 +16,8 @@ const ( Delete = "delete" TestAndSet = "testAndSet" Expire = "expire" + UndefIndex = 0 + UndefTerm = 0 ) type Event struct { @@ -92,6 +94,7 @@ type EventHistory struct { StartIndex uint64 LastIndex uint64 LastTerm uint64 + DupIndex uint64 rwl sync.RWMutex } @@ -109,12 +112,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - if e.Index == 0 { + DupIndex := uint64(0) + + if e.Index == UndefIndex { e.Index = eh.LastIndex + DupIndex = 1 } - if e.Term == 0 { + if e.Term == UndefTerm { e.Term = eh.LastTerm + DupIndex = 1 } eh.Queue.insert(e) @@ -123,32 +130,18 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.LastIndex = e.Index eh.LastTerm = e.Term + eh.DupIndex += DupIndex return e } -// addEvent with the last event's index and term -/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { - eh.rwl.Lock() - defer eh.rwl.Unlock() - - LastEvent := eh.Queue.Events[eh.Queue.back()] - e = newEvent(action, key, LastEvent.Index, LastEvent.Term); - - eh.Queue.insert(e) - - eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - - return e; -}*/ - // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - start := index - eh.StartIndex + start := index - eh.StartIndex + eh.DupIndex // the index should locate after the event history's StartIndex // and before its size diff --git a/store/event_test.go b/store/event_test.go index 0d19dd52a..e5b35061c 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) { // Add for i := 0; i < 200; i++ { - e := newEvent(Create, "/foo", uint64(i), 0) + e := newEvent(Create, "/foo", uint64(i), 1) eh.addEvent(e) } @@ -37,11 +37,11 @@ func TestScanHistory(t *testing.T) { eh := newEventHistory(100) // Add - eh.addEvent(newEvent(Create, "/foo", 1, 0)) - eh.addEvent(newEvent(Create, "/foo/bar", 2, 0)) - eh.addEvent(newEvent(Create, "/foo/foo", 3, 0)) - eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0)) - eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0)) + eh.addEvent(newEvent(Create, "/foo", 1, 1)) + eh.addEvent(newEvent(Create, "/foo/bar", 2, 1)) + eh.addEvent(newEvent(Create, "/foo/foo", 3, 1)) + eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1)) + eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1)) e, err := eh.scan("/foo", 1) if err != nil || e.Index != 1 { diff --git a/store/node.go b/store/node.go index 214a33fa0..f82f01cd6 100644 --- a/store/node.go +++ b/store/node.go @@ -66,6 +66,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node // If the node is a directory and recursive is true, the function will recursively remove // add nodes under the receiver node. func (n *Node) Remove(recursive bool, callback func(path string)) error { + n.mu.Lock() defer n.mu.Unlock() @@ -87,6 +88,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error { n.stopExpire <- true n.status = removed + } return nil @@ -265,14 +267,14 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): - e := newEvent(Expire, n.Path, 0, 0) + s.worldLock.Lock() + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) n.Remove(true, nil) - s.Stats.Inc(ExpireCount) - s.WatcherHub.notify(e) - + s.worldLock.Unlock() return // if stopped, return @@ -364,7 +366,11 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { if !n.IsPermanent() { - n.stopExpire <- true // suspend it to modify the expiration + expired, _ := n.IsExpired() + + if !expired { + n.stopExpire <- true // suspend it to modify the expiration + } } if expireTime.Sub(Permanent) != 0 { diff --git a/store/stats_test.go b/store/stats_test.go index 207df825f..52cd1c8f8 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("create") + time.Sleep(time.Second * 3) for _, k := range keys { @@ -35,6 +37,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get") + for _, k := range keys { i++ _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) @@ -45,6 +49,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("update") + time.Sleep(time.Second * 3) for _, k := range keys { @@ -66,11 +72,15 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get testAndSet") + for _, k := range keys { s.Watch(k, false, 0, i, 1) watcher_number++ } + //fmt.Println("watch") + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -91,6 +101,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get delete") + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { diff --git a/store/store.go b/store/store.go index 51f49f51f..e6b8cf1b9 100644 --- a/store/store.go +++ b/store/store.go @@ -164,6 +164,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + s.worldLock.RLock() defer s.worldLock.RUnlock() @@ -171,15 +172,16 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde if err != nil { // if the node does not exist, return error s.Stats.Inc(UpdateFail) + return nil, err } e := newEvent(Update, nodePath, s.Index, s.Term) if n.IsDir() { // if the node is a directory, we can only update ttl - if len(value) != 0 { s.Stats.Inc(UpdateFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } @@ -195,7 +197,6 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde // update ttl n.UpdateTTL(expireTime, s) - e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 diff --git a/store/store_test.go b/store/store_test.go index 01a934d4a..a16e57032 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -320,88 +320,88 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? func TestWatch(t *testing.T) { s := New() // watch at a deeper path - c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) s.Create("/foo/foo/foo", "bar", Permanent, 1, 1) e := nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Create node fails") + if e.Key != "/foo/foo/foo" || e.Action != Create { + t.Fatal("watch for Create node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) s.Update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Update node fails") + if e.Key != "/foo/foo/foo" || e.Action != Update { + t.Fatal("watch for Update node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { + if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet node fails") } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1) s.Delete("/foo", true, 4, 1) //recursively delete e = nonblockingRetrive(c) - if e.Key != "/foo" { - t.Fatal("watch for Delete node fails") + if e.Key != "/foo" || e.Action != Delete { + t.Fatal("watch for Delete node fails ", e) } // watch at a prefix - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 4, 1) s.Create("/foo/foo/boo", "bar", Permanent, 5, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Create { t.Fatal("watch for Create subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 5, 1) s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 6, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 7, 1) s.Delete("/foo/foo/boo", false, 8, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Delete { t.Fatal("watch for Delete subdirectory fails") } // watch expire s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 9, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 9 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 9 { t.Fatal("watch for Expiration of Create() subdirectory fails ", e) } s.Create("/foo/foo/boo", "foo", Permanent, 10, 1) s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 11 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 11 { t.Fatal("watch for Expiration of Update() subdirectory fails ", e) } s.Create("/foo/foo/boo", "foo", Permanent, 12, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 13 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) } @@ -479,21 +479,12 @@ func TestSaveAndRecover(t *testing.T) { panic(err) } } - + s.worldLock.RLock() + defer s.worldLock.RUnlock() if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } - //t.Log("watcherhub.size: ", s.WatcherHub.EventHistory.Queue.Size) - //for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { - // t.Log(s.WatcherHub.EventHistory.Queue.Events[i]) - //} - // - //t.Log("ClonedWatcherhub.size: ", cloneFs.WatcherHub.EventHistory.Queue.Size) - //for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { - // t.Log(cloneFs.WatcherHub.EventHistory.Queue.Events[i]) - //} - for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { if s.WatcherHub.EventHistory.Queue.Events[i].Key != cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { diff --git a/store/watcher.go b/store/watcher.go index d67c713df..7576a866b 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -47,7 +47,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan w := &watcher{ eventChan: eventChan, recursive: recursive, - sinceIndex: index, + sinceIndex: index - 1, // to catch Expire() } l, ok := wh.watchers[prefix] diff --git a/store/watcher_test.go b/store/watcher_test.go index 90c23c59e..e437422ad 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -7,8 +7,7 @@ import ( func TestWatcher(t *testing.T) { s := New() wh := s.WatcherHub - c, err := wh.watch("/foo", true, 0) - + c, err := wh.watch("/foo", true, 1) if err != nil { t.Fatal("%v", err) } @@ -20,7 +19,7 @@ func TestWatcher(t *testing.T) { // do nothing } - e := newEvent(Create, "/foo/bar", 1, 0) + e := newEvent(Create, "/foo/bar", 1, 1) wh.notify(e) @@ -30,20 +29,20 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - c, _ = wh.watch("/foo", false, 0) + c, _ = wh.watch("/foo", false, 2) - e = newEvent(Create, "/foo/bar", 1, 0) + e = newEvent(Create, "/foo/bar", 2, 1) wh.notify(e) select { - case <-c: - t.Fatal("should not receive from channel if not recursive") + case re = <-c: + t.Fatal("should not receive from channel if not recursive ", re) default: // do nothing } - e = newEvent(Create, "/foo", 1, 0) + e = newEvent(Create, "/foo", 3, 1) wh.notify(e) From 6fdffbcc851aa1db0c50289e05ec9d163c3189ab Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 22:17:17 -0400 Subject: [PATCH 5/7] delete some debug comments in stats_test.go --- store/stats_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/store/stats_test.go b/store/stats_test.go index 52cd1c8f8..207df825f 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -24,8 +24,6 @@ func TestBasicStats(t *testing.T) { } } - //fmt.Println("create") - time.Sleep(time.Second * 3) for _, k := range keys { @@ -37,8 +35,6 @@ func TestBasicStats(t *testing.T) { } } - //fmt.Println("get") - for _, k := range keys { i++ _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) @@ -49,8 +45,6 @@ func TestBasicStats(t *testing.T) { } } - //fmt.Println("update") - time.Sleep(time.Second * 3) for _, k := range keys { @@ -72,15 +66,11 @@ func TestBasicStats(t *testing.T) { } } - //fmt.Println("get testAndSet") - for _, k := range keys { s.Watch(k, false, 0, i, 1) watcher_number++ } - //fmt.Println("watch") - for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -101,8 +91,6 @@ func TestBasicStats(t *testing.T) { } } - //fmt.Println("get delete") - for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { From 6f591032ef33b6e81311fb10957c3956974cd90e Mon Sep 17 00:00:00 2001 From: evan-gu Date: Tue, 1 Oct 2013 00:35:44 -0400 Subject: [PATCH 6/7] rename to DupCnt, duped; add some comments, maintained some format, add notification for immediate expiration --- store/event.go | 15 ++++++--------- store/event_test.go | 3 --- store/node.go | 22 +++++++++++++++++----- store/store.go | 4 ++-- store/store_test.go | 14 ++++---------- store/watcher.go | 5 +---- 6 files changed, 30 insertions(+), 33 deletions(-) diff --git a/store/event.go b/store/event.go index e95d50f1a..34bdf7d9b 100644 --- a/store/event.go +++ b/store/event.go @@ -76,7 +76,6 @@ func (eq *eventQueue) back() int { } func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity eq.Events[index] = e @@ -94,7 +93,7 @@ type EventHistory struct { StartIndex uint64 LastIndex uint64 LastTerm uint64 - DupIndex uint64 + DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue rwl sync.RWMutex } @@ -112,16 +111,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - DupIndex := uint64(0) + duped := uint64(0) if e.Index == UndefIndex { e.Index = eh.LastIndex - DupIndex = 1 + duped = 1 } if e.Term == UndefTerm { e.Term = eh.LastTerm - DupIndex = 1 + duped = 1 } eh.Queue.insert(e) @@ -130,7 +129,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.LastIndex = e.Index eh.LastTerm = e.Term - eh.DupIndex += DupIndex + eh.DupCnt += duped return e } @@ -141,7 +140,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - start := index - eh.StartIndex + eh.DupIndex + start := index - eh.StartIndex + eh.DupCnt // the index should locate after the event history's StartIndex // and before its size @@ -172,13 +171,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { return nil, nil } } - } // clone will be protected by a stop-world lock // do not need to obtain internal lock func (eh *EventHistory) clone() *EventHistory { - clonedQueue := eventQueue{ Capacity: eh.Queue.Capacity, Events: make([]*Event, eh.Queue.Capacity), diff --git a/store/event_test.go b/store/event_test.go index e5b35061c..c02a4d70e 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -28,9 +28,7 @@ func TestEventQueue(t *testing.T) { } j++ i = (i + 1) % eh.Queue.Capacity - } - } func TestScanHistory(t *testing.T) { @@ -65,5 +63,4 @@ func TestScanHistory(t *testing.T) { if e != nil { t.Fatalf("bad index shoud reuturn nil") } - } diff --git a/store/node.go b/store/node.go index f82f01cd6..cad85c75f 100644 --- a/store/node.go +++ b/store/node.go @@ -66,7 +66,6 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node // If the node is a directory and recursive is true, the function will recursively remove // add nodes under the receiver node. func (n *Node) Remove(recursive bool, callback func(path string)) error { - n.mu.Lock() defer n.mu.Unlock() @@ -187,7 +186,6 @@ func (n *Node) GetFile(name string) (*Node, error) { } return nil, nil - } // Add function adds a node to the receiver node. @@ -216,7 +214,6 @@ func (n *Node) Add(child *Node) error { n.Children[name] = child return nil - } // Clone function clone the node recursively and return the new node. @@ -251,11 +248,23 @@ func (n *Node) recoverAndclean(s *Store) { n.Expire(s) } +// Expire function will test if the node is expired. +// if the node is already expired, delete the node and return. +// if the node is permemant (this shouldn't happen), return at once. +// else wait for a period time, then remove the node. and notify the watchhub. func (n *Node) Expire(s *Store) { expired, duration := n.IsExpired() if expired { // has been expired + + // since the parent function of Expire() runs serially, + // there is no need for lock here + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) + n.Remove(true, nil) + s.Stats.Inc(ExpireCount) + return } @@ -267,20 +276,23 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): + + // Lock to avoid race s.worldLock.Lock() e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) s.WatcherHub.notify(e) + n.Remove(true, nil) s.Stats.Inc(ExpireCount) s.worldLock.Unlock() + return // if stopped, return case <-n.stopExpire: return - } }() } @@ -294,7 +306,6 @@ func (n *Node) IsHidden() bool { _, name := path.Split(n.Path) return name[0] == '_' - } func (n *Node) IsPermanent() bool { @@ -355,6 +366,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { if sorted { sort.Sort(pair) } + return pair } diff --git a/store/store.go b/store/store.go index e6b8cf1b9..f77189172 100644 --- a/store/store.go +++ b/store/store.go @@ -164,7 +164,6 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - s.worldLock.RLock() defer s.worldLock.RUnlock() @@ -197,10 +196,11 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde // update ttl n.UpdateTTL(expireTime, s) + e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 - s.WatcherHub.notify(e) + s.Stats.Inc(UpdateSuccess) return e, nil diff --git a/store/store_test.go b/store/store_test.go index a16e57032..87010b1ee 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -52,7 +52,6 @@ func TestCreateAndGet(t *testing.T) { if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") } - } func TestUpdateFile(t *testing.T) { @@ -81,7 +80,6 @@ func TestUpdateFile(t *testing.T) { } // create a directory, update its ttl, to see if it will be deleted - _, err = s.Create("/foo/foo", "", Permanent, 3, 1) if err != nil { @@ -237,7 +235,6 @@ func TestRemove(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } func TestExpire(t *testing.T) { @@ -280,7 +277,6 @@ func TestExpire(t *testing.T) { if err != nil { t.Fatalf("cannot delete the node before expiration", err.Error()) } - } func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? @@ -314,7 +310,6 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? if e.PrevValue != "car" || e.Value != "bar" { t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar") } - } func TestWatch(t *testing.T) { @@ -404,7 +399,6 @@ func TestWatch(t *testing.T) { if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) } - } func TestSort(t *testing.T) { @@ -479,8 +473,11 @@ func TestSaveAndRecover(t *testing.T) { panic(err) } } + + // lock to avoid racing with Expire() s.worldLock.RLock() defer s.worldLock.RUnlock() + if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } @@ -497,7 +494,6 @@ func TestSaveAndRecover(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } // GenKeys randomly generate num of keys with max depth @@ -513,6 +509,7 @@ func GenKeys(num int, depth int) []string { keys[i] += "/" + strconv.Itoa(rand.Int()) } } + return keys } @@ -532,11 +529,9 @@ func createAndGet(s *Store, path string, t *testing.T) { if e.Value != "bar" { t.Fatalf("expect value of %s is bar [%s]", path, e.Value) } - } func recursiveTestSort(k KeyValuePair, t *testing.T) { - for i, v := range k.KVPairs[:len(k.KVPairs)-1] { if v.Key >= k.KVPairs[i+1].Key { t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key) @@ -545,7 +540,6 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) { if v.Dir { recursiveTestSort(v, t) } - } if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir { diff --git a/store/watcher.go b/store/watcher.go index 7576a866b..5d96e45cd 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -74,21 +74,19 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { notifiedAll := true for { - if curr == nil { // we have reached the end of the list - if notifiedAll { // if we have notified all watcher in the list // we can delete the list delete(wh.watchers, path) } + break } next := curr.Next() // save the next w, _ := curr.Value.(*watcher) - if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex { w.eventChan <- e l.Remove(curr) @@ -98,7 +96,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } curr = next // go to the next one - } } } From 974d74befbd3ac882f012e564a5f9a3b7b35e489 Mon Sep 17 00:00:00 2001 From: evan-gu Date: Tue, 1 Oct 2013 01:25:45 -0400 Subject: [PATCH 7/7] add some comment and change a declaration form --- store/event.go | 2 +- store/node.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/store/event.go b/store/event.go index 34bdf7d9b..53d4568bd 100644 --- a/store/event.go +++ b/store/event.go @@ -111,7 +111,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - duped := uint64(0) + var duped uint64 if e.Index == UndefIndex { e.Index = eh.LastIndex diff --git a/store/node.go b/store/node.go index cad85c75f..f5545fa60 100644 --- a/store/node.go +++ b/store/node.go @@ -277,7 +277,8 @@ func (n *Node) Expire(s *Store) { // if timeout, delete the node case <-time.After(duration): - // Lock to avoid race + // Lock the worldLock to avoid race on s.WatchHub, + // and the race with other slibling nodes on their common parent. s.worldLock.Lock() e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)