fix save and recovery

This commit is contained in:
Xiang Li 2013-11-04 21:51:14 -08:00
parent 0d8510df33
commit 07b52ee24c
4 changed files with 43 additions and 38 deletions

View File

@ -1,14 +1,13 @@
package store package store
import ( import (
"container/heap"
"fmt" "fmt"
"testing" "testing"
"time" "time"
) )
func TestHeapPushPop(t *testing.T) { func TestHeapPushPop(t *testing.T) {
h := newTTLKeyHeap() h := newTtlKeyHeap()
// add from older expire time to earlier expire time // add from older expire time to earlier expire time
// the path is equal to ttl from now // the path is equal to ttl from now
@ -32,8 +31,7 @@ func TestHeapPushPop(t *testing.T) {
} }
func TestHeapUpdate(t *testing.T) { func TestHeapUpdate(t *testing.T) {
h := &TTLKeyHeap{Map: make(map[*Node]int)} h := newTtlKeyHeap()
heap.Init(h)
kvs := make([]*Node, 10) kvs := make([]*Node, 10)

View File

@ -240,7 +240,7 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
} }
if !n.IsPermanent() { if !n.IsPermanent() {
n.store.TTLKeyHeap.remove(n) n.store.ttlKeyHeap.remove(n)
} }
// the stop channel has a buffer. just send to it! // the stop channel has a buffer. just send to it!
@ -262,7 +262,7 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
} }
if !n.IsPermanent() { if !n.IsPermanent() {
n.store.TTLKeyHeap.remove(n) n.store.ttlKeyHeap.remove(n)
} }
n.stopExpire <- true n.stopExpire <- true
@ -375,18 +375,18 @@ func (n *Node) UpdateTTL(expireTime time.Time) {
if expireTime.IsZero() { if expireTime.IsZero() {
// from ttl to permanent // from ttl to permanent
// remove from ttl heap // remove from ttl heap
n.store.TTLKeyHeap.remove(n) n.store.ttlKeyHeap.remove(n)
} else { } else {
// update ttl // update ttl
// update ttl heap // update ttl heap
n.store.TTLKeyHeap.update(n) n.store.ttlKeyHeap.update(n)
} }
} else { } else {
if !expireTime.IsZero() { if !expireTime.IsZero() {
// from permanent to ttl // from permanent to ttl
// push into ttl heap // push into ttl heap
n.store.TTLKeyHeap.push(n) n.store.ttlKeyHeap.push(n)
} }
} }
@ -442,5 +442,9 @@ func (n *Node) recoverAndclean() {
n.stopExpire = make(chan bool, 1) n.stopExpire = make(chan bool, 1)
if !n.ExpireTime.IsZero() {
n.store.ttlKeyHeap.push(n)
}
n.Expire() n.Expire()
} }

View File

@ -37,11 +37,11 @@ type Store interface {
type store struct { type store struct {
Root *Node Root *Node
WatcherHub *watcherHub WatcherHub *watcherHub
TTLKeyHeap *TTLKeyHeap
Index uint64 Index uint64
Term uint64 Term uint64
Stats *Stats Stats *Stats
CurrentVersion int CurrentVersion int
ttlKeyHeap *ttlKeyHeap // need to recovery manually
worldLock sync.RWMutex // stop the world lock worldLock sync.RWMutex // stop the world lock
} }
@ -55,7 +55,7 @@ func newStore() *store {
s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent) s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
s.Stats = newStats() s.Stats = newStats()
s.WatcherHub = newWatchHub(1000) s.WatcherHub = newWatchHub(1000)
s.TTLKeyHeap = newTTLKeyHeap() s.ttlKeyHeap = newTtlKeyHeap()
return s return s
} }
@ -393,7 +393,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
// Node with TTL // Node with TTL
if !n.IsPermanent() { if !n.IsPermanent() {
s.TTLKeyHeap.push(n) s.ttlKeyHeap.push(n)
n.Expire() n.Expire()
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
@ -477,6 +477,7 @@ func (s *store) Save() ([]byte, error) {
b, err := json.Marshal(clonedStore) b, err := json.Marshal(clonedStore)
if err != nil { if err != nil {
fmt.Println(err)
return nil, err return nil, err
} }
@ -496,6 +497,8 @@ func (s *store) Recovery(state []byte) error {
return err return err
} }
s.ttlKeyHeap = newTtlKeyHeap()
s.Root.recoverAndclean() s.Root.recoverAndclean()
return nil return nil
} }

View File

@ -5,66 +5,66 @@ import (
) )
// An TTLKeyHeap is a min-heap of TTLKeys order by expiration time // An TTLKeyHeap is a min-heap of TTLKeys order by expiration time
type TTLKeyHeap struct { type ttlKeyHeap struct {
Array []*Node array []*Node
Map map[*Node]int keyMap map[*Node]int
} }
func newTTLKeyHeap() *TTLKeyHeap { func newTtlKeyHeap() *ttlKeyHeap {
h := &TTLKeyHeap{Map: make(map[*Node]int)} h := &ttlKeyHeap{keyMap: make(map[*Node]int)}
heap.Init(h) heap.Init(h)
return h return h
} }
func (h TTLKeyHeap) Len() int { func (h ttlKeyHeap) Len() int {
return len(h.Array) return len(h.array)
} }
func (h TTLKeyHeap) Less(i, j int) bool { func (h ttlKeyHeap) Less(i, j int) bool {
return h.Array[i].ExpireTime.Before(h.Array[j].ExpireTime) return h.array[i].ExpireTime.Before(h.array[j].ExpireTime)
} }
func (h TTLKeyHeap) Swap(i, j int) { func (h ttlKeyHeap) Swap(i, j int) {
// swap node // swap node
h.Array[i], h.Array[j] = h.Array[j], h.Array[i] h.array[i], h.array[j] = h.array[j], h.array[i]
// update map // update map
h.Map[h.Array[i]] = i h.keyMap[h.array[i]] = i
h.Map[h.Array[j]] = j h.keyMap[h.array[j]] = j
} }
func (h *TTLKeyHeap) Push(x interface{}) { func (h *ttlKeyHeap) Push(x interface{}) {
n, _ := x.(*Node) n, _ := x.(*Node)
h.Map[n] = len(h.Array) h.keyMap[n] = len(h.array)
h.Array = append(h.Array, n) h.array = append(h.array, n)
} }
func (h *TTLKeyHeap) Pop() interface{} { func (h *ttlKeyHeap) Pop() interface{} {
old := h.Array old := h.array
n := len(old) n := len(old)
x := old[n-1] x := old[n-1]
h.Array = old[0 : n-1] h.array = old[0 : n-1]
delete(h.Map, x) delete(h.keyMap, x)
return x return x
} }
func (h *TTLKeyHeap) pop() *Node { func (h *ttlKeyHeap) pop() *Node {
x := heap.Pop(h) x := heap.Pop(h)
n, _ := x.(*Node) n, _ := x.(*Node)
return n return n
} }
func (h *TTLKeyHeap) push(x interface{}) { func (h *ttlKeyHeap) push(x interface{}) {
heap.Push(h, x) heap.Push(h, x)
} }
func (h *TTLKeyHeap) update(n *Node) { func (h *ttlKeyHeap) update(n *Node) {
index := h.Map[n] index := h.keyMap[n]
heap.Remove(h, index) heap.Remove(h, index)
heap.Push(h, n) heap.Push(h, n)
} }
func (h *TTLKeyHeap) remove(n *Node) { func (h *ttlKeyHeap) remove(n *Node) {
index := h.Map[n] index := h.keyMap[n]
heap.Remove(h, index) heap.Remove(h, index)
} }