diff --git a/store/event.go b/store/event.go index e95d50f1a..34bdf7d9b 100644 --- a/store/event.go +++ b/store/event.go @@ -76,7 +76,6 @@ func (eq *eventQueue) back() int { } func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity eq.Events[index] = e @@ -94,7 +93,7 @@ type EventHistory struct { StartIndex uint64 LastIndex uint64 LastTerm uint64 - DupIndex uint64 + DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue rwl sync.RWMutex } @@ -112,16 +111,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - DupIndex := uint64(0) + duped := uint64(0) if e.Index == UndefIndex { e.Index = eh.LastIndex - DupIndex = 1 + duped = 1 } if e.Term == UndefTerm { e.Term = eh.LastTerm - DupIndex = 1 + duped = 1 } eh.Queue.insert(e) @@ -130,7 +129,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.LastIndex = e.Index eh.LastTerm = e.Term - eh.DupIndex += DupIndex + eh.DupCnt += duped return e } @@ -141,7 +140,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - start := index - eh.StartIndex + eh.DupIndex + start := index - eh.StartIndex + eh.DupCnt // the index should locate after the event history's StartIndex // and before its size @@ -172,13 +171,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { return nil, nil } } - } // clone will be protected by a stop-world lock // do not need to obtain internal lock func (eh *EventHistory) clone() *EventHistory { - clonedQueue := eventQueue{ Capacity: eh.Queue.Capacity, Events: make([]*Event, eh.Queue.Capacity), diff --git a/store/event_test.go b/store/event_test.go index e5b35061c..c02a4d70e 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -28,9 +28,7 @@ func TestEventQueue(t *testing.T) { } j++ i = (i + 1) % eh.Queue.Capacity - } - } func TestScanHistory(t *testing.T) { @@ -65,5 +63,4 @@ func TestScanHistory(t *testing.T) { if e != nil { t.Fatalf("bad index shoud reuturn nil") } - } diff --git a/store/node.go b/store/node.go index f82f01cd6..cad85c75f 100644 --- a/store/node.go +++ b/store/node.go @@ -66,7 +66,6 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node // If the node is a directory and recursive is true, the function will recursively remove // add nodes under the receiver node. func (n *Node) Remove(recursive bool, callback func(path string)) error { - n.mu.Lock() defer n.mu.Unlock() @@ -187,7 +186,6 @@ func (n *Node) GetFile(name string) (*Node, error) { } return nil, nil - } // Add function adds a node to the receiver node. @@ -216,7 +214,6 @@ func (n *Node) Add(child *Node) error { n.Children[name] = child return nil - } // Clone function clone the node recursively and return the new node. @@ -251,11 +248,23 @@ func (n *Node) recoverAndclean(s *Store) { n.Expire(s) } +// Expire function will test if the node is expired. +// if the node is already expired, delete the node and return. +// if the node is permemant (this shouldn't happen), return at once. +// else wait for a period time, then remove the node. and notify the watchhub. func (n *Node) Expire(s *Store) { expired, duration := n.IsExpired() if expired { // has been expired + + // since the parent function of Expire() runs serially, + // there is no need for lock here + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) + n.Remove(true, nil) + s.Stats.Inc(ExpireCount) + return } @@ -267,20 +276,23 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): + + // Lock to avoid race s.worldLock.Lock() e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) s.WatcherHub.notify(e) + n.Remove(true, nil) s.Stats.Inc(ExpireCount) s.worldLock.Unlock() + return // if stopped, return case <-n.stopExpire: return - } }() } @@ -294,7 +306,6 @@ func (n *Node) IsHidden() bool { _, name := path.Split(n.Path) return name[0] == '_' - } func (n *Node) IsPermanent() bool { @@ -355,6 +366,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { if sorted { sort.Sort(pair) } + return pair } diff --git a/store/store.go b/store/store.go index e6b8cf1b9..f77189172 100644 --- a/store/store.go +++ b/store/store.go @@ -164,7 +164,6 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - s.worldLock.RLock() defer s.worldLock.RUnlock() @@ -197,10 +196,11 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde // update ttl n.UpdateTTL(expireTime, s) + e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 - s.WatcherHub.notify(e) + s.Stats.Inc(UpdateSuccess) return e, nil diff --git a/store/store_test.go b/store/store_test.go index a16e57032..87010b1ee 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -52,7 +52,6 @@ func TestCreateAndGet(t *testing.T) { if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") } - } func TestUpdateFile(t *testing.T) { @@ -81,7 +80,6 @@ func TestUpdateFile(t *testing.T) { } // create a directory, update its ttl, to see if it will be deleted - _, err = s.Create("/foo/foo", "", Permanent, 3, 1) if err != nil { @@ -237,7 +235,6 @@ func TestRemove(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } func TestExpire(t *testing.T) { @@ -280,7 +277,6 @@ func TestExpire(t *testing.T) { if err != nil { t.Fatalf("cannot delete the node before expiration", err.Error()) } - } func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? @@ -314,7 +310,6 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? if e.PrevValue != "car" || e.Value != "bar" { t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar") } - } func TestWatch(t *testing.T) { @@ -404,7 +399,6 @@ func TestWatch(t *testing.T) { if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) } - } func TestSort(t *testing.T) { @@ -479,8 +473,11 @@ func TestSaveAndRecover(t *testing.T) { panic(err) } } + + // lock to avoid racing with Expire() s.worldLock.RLock() defer s.worldLock.RUnlock() + if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } @@ -497,7 +494,6 @@ func TestSaveAndRecover(t *testing.T) { if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") } - } // GenKeys randomly generate num of keys with max depth @@ -513,6 +509,7 @@ func GenKeys(num int, depth int) []string { keys[i] += "/" + strconv.Itoa(rand.Int()) } } + return keys } @@ -532,11 +529,9 @@ func createAndGet(s *Store, path string, t *testing.T) { if e.Value != "bar" { t.Fatalf("expect value of %s is bar [%s]", path, e.Value) } - } func recursiveTestSort(k KeyValuePair, t *testing.T) { - for i, v := range k.KVPairs[:len(k.KVPairs)-1] { if v.Key >= k.KVPairs[i+1].Key { t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key) @@ -545,7 +540,6 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) { if v.Dir { recursiveTestSort(v, t) } - } if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir { diff --git a/store/watcher.go b/store/watcher.go index 7576a866b..5d96e45cd 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -74,21 +74,19 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { notifiedAll := true for { - if curr == nil { // we have reached the end of the list - if notifiedAll { // if we have notified all watcher in the list // we can delete the list delete(wh.watchers, path) } + break } next := curr.Next() // save the next w, _ := curr.Value.(*watcher) - if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex { w.eventChan <- e l.Remove(curr) @@ -98,7 +96,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } curr = next // go to the next one - } } }