From 07b52ee24c768b338c5b249e409c52870b8d8e7f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 4 Nov 2013 21:51:14 -0800 Subject: [PATCH] fix save and recovery --- store/heap_test.go | 6 ++--- store/node.go | 14 +++++++----- store/store.go | 9 +++++--- store/ttl_key_heap.go | 52 +++++++++++++++++++++---------------------- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/store/heap_test.go b/store/heap_test.go index e9cda376e..aa0b9caf6 100644 --- a/store/heap_test.go +++ b/store/heap_test.go @@ -1,14 +1,13 @@ package store import ( - "container/heap" "fmt" "testing" "time" ) func TestHeapPushPop(t *testing.T) { - h := newTTLKeyHeap() + h := newTtlKeyHeap() // add from older expire time to earlier expire time // the path is equal to ttl from now @@ -32,8 +31,7 @@ func TestHeapPushPop(t *testing.T) { } func TestHeapUpdate(t *testing.T) { - h := &TTLKeyHeap{Map: make(map[*Node]int)} - heap.Init(h) + h := newTtlKeyHeap() kvs := make([]*Node, 10) diff --git a/store/node.go b/store/node.go index 72510260d..38a969203 100644 --- a/store/node.go +++ b/store/node.go @@ -240,7 +240,7 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) { } if !n.IsPermanent() { - n.store.TTLKeyHeap.remove(n) + n.store.ttlKeyHeap.remove(n) } // the stop channel has a buffer. just send to it! @@ -262,7 +262,7 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) { } if !n.IsPermanent() { - n.store.TTLKeyHeap.remove(n) + n.store.ttlKeyHeap.remove(n) } n.stopExpire <- true @@ -375,18 +375,18 @@ func (n *Node) UpdateTTL(expireTime time.Time) { if expireTime.IsZero() { // from ttl to permanent // remove from ttl heap - n.store.TTLKeyHeap.remove(n) + n.store.ttlKeyHeap.remove(n) } else { // update ttl // update ttl heap - n.store.TTLKeyHeap.update(n) + n.store.ttlKeyHeap.update(n) } } else { if !expireTime.IsZero() { // from permanent to ttl // push into ttl heap - n.store.TTLKeyHeap.push(n) + n.store.ttlKeyHeap.push(n) } } @@ -442,5 +442,9 @@ func (n *Node) recoverAndclean() { n.stopExpire = make(chan bool, 1) + if !n.ExpireTime.IsZero() { + n.store.ttlKeyHeap.push(n) + } + n.Expire() } diff --git a/store/store.go b/store/store.go index babb5a932..f7abf50df 100644 --- a/store/store.go +++ b/store/store.go @@ -37,11 +37,11 @@ type Store interface { type store struct { Root *Node WatcherHub *watcherHub - TTLKeyHeap *TTLKeyHeap Index uint64 Term uint64 Stats *Stats CurrentVersion int + ttlKeyHeap *ttlKeyHeap // need to recovery manually worldLock sync.RWMutex // stop the world lock } @@ -55,7 +55,7 @@ func newStore() *store { s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent) s.Stats = newStats() s.WatcherHub = newWatchHub(1000) - s.TTLKeyHeap = newTTLKeyHeap() + s.ttlKeyHeap = newTtlKeyHeap() return s } @@ -393,7 +393,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla // Node with TTL if !n.IsPermanent() { - s.TTLKeyHeap.push(n) + s.ttlKeyHeap.push(n) n.Expire() e.Expiration, e.TTL = n.ExpirationAndTTL() @@ -477,6 +477,7 @@ func (s *store) Save() ([]byte, error) { b, err := json.Marshal(clonedStore) if err != nil { + fmt.Println(err) return nil, err } @@ -496,6 +497,8 @@ func (s *store) Recovery(state []byte) error { return err } + s.ttlKeyHeap = newTtlKeyHeap() + s.Root.recoverAndclean() return nil } diff --git a/store/ttl_key_heap.go b/store/ttl_key_heap.go index 34fa4ba63..28c38ea31 100644 --- a/store/ttl_key_heap.go +++ b/store/ttl_key_heap.go @@ -5,66 +5,66 @@ import ( ) // An TTLKeyHeap is a min-heap of TTLKeys order by expiration time -type TTLKeyHeap struct { - Array []*Node - Map map[*Node]int +type ttlKeyHeap struct { + array []*Node + keyMap map[*Node]int } -func newTTLKeyHeap() *TTLKeyHeap { - h := &TTLKeyHeap{Map: make(map[*Node]int)} +func newTtlKeyHeap() *ttlKeyHeap { + h := &ttlKeyHeap{keyMap: make(map[*Node]int)} heap.Init(h) return h } -func (h TTLKeyHeap) Len() int { - return len(h.Array) +func (h ttlKeyHeap) Len() int { + return len(h.array) } -func (h TTLKeyHeap) Less(i, j int) bool { - return h.Array[i].ExpireTime.Before(h.Array[j].ExpireTime) +func (h ttlKeyHeap) Less(i, j int) bool { + return h.array[i].ExpireTime.Before(h.array[j].ExpireTime) } -func (h TTLKeyHeap) Swap(i, j int) { +func (h ttlKeyHeap) Swap(i, j int) { // swap node - h.Array[i], h.Array[j] = h.Array[j], h.Array[i] + h.array[i], h.array[j] = h.array[j], h.array[i] // update map - h.Map[h.Array[i]] = i - h.Map[h.Array[j]] = j + h.keyMap[h.array[i]] = i + h.keyMap[h.array[j]] = j } -func (h *TTLKeyHeap) Push(x interface{}) { +func (h *ttlKeyHeap) Push(x interface{}) { n, _ := x.(*Node) - h.Map[n] = len(h.Array) - h.Array = append(h.Array, n) + h.keyMap[n] = len(h.array) + h.array = append(h.array, n) } -func (h *TTLKeyHeap) Pop() interface{} { - old := h.Array +func (h *ttlKeyHeap) Pop() interface{} { + old := h.array n := len(old) x := old[n-1] - h.Array = old[0 : n-1] - delete(h.Map, x) + h.array = old[0 : n-1] + delete(h.keyMap, x) return x } -func (h *TTLKeyHeap) pop() *Node { +func (h *ttlKeyHeap) pop() *Node { x := heap.Pop(h) n, _ := x.(*Node) return n } -func (h *TTLKeyHeap) push(x interface{}) { +func (h *ttlKeyHeap) push(x interface{}) { heap.Push(h, x) } -func (h *TTLKeyHeap) update(n *Node) { - index := h.Map[n] +func (h *ttlKeyHeap) update(n *Node) { + index := h.keyMap[n] heap.Remove(h, index) heap.Push(h, n) } -func (h *TTLKeyHeap) remove(n *Node) { - index := h.Map[n] +func (h *ttlKeyHeap) remove(n *Node) { + index := h.keyMap[n] heap.Remove(h, index) }