From 63ba16f51adcf6e7426abb34c49c6defc5ecc37f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 16:05:49 -0700 Subject: [PATCH] add clone func to help snapshot. add benchmarks --- store/keyword_test.go | 8 +-- store/store.go | 99 +++++++++++++++++++++++++++----- store/store_test.go | 22 +++++-- store/test.go | 21 +++++++ store/tree.go | 48 ++++++++++++++-- store/tree_store_test.go | 120 ++++++++++++++++++++++++++++++++------- 6 files changed, 270 insertions(+), 48 deletions(-) create mode 100644 store/test.go diff --git a/store/keyword_test.go b/store/keyword_test.go index f2d15565d..7c54a9fde 100644 --- a/store/keyword_test.go +++ b/store/keyword_test.go @@ -7,25 +7,25 @@ import ( func TestKeywords(t *testing.T) { keyword := CheckKeyword("_etcd") if !keyword { - t.Fatal("machines should be keyword") + t.Fatal("_etcd should be keyword") } keyword = CheckKeyword("/_etcd") if !keyword { - t.Fatal("/machines should be keyword") + t.Fatal("/_etcd should be keyword") } keyword = CheckKeyword("/_etcd/") if !keyword { - t.Fatal("/machines/ contains keyword prefix") + t.Fatal("/_etcd/ contains keyword prefix") } keyword = CheckKeyword("/_etcd/node1") if !keyword { - t.Fatal("/machines/* contains keyword prefix") + t.Fatal("/_etcd/* contains keyword prefix") } keyword = CheckKeyword("/nokeyword/_etcd/node1") diff --git a/store/store.go b/store/store.go index 8132019ce..e850ff592 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ import ( "path" "strconv" "time" + "sync" ) //------------------------------------------------------------------------------ @@ -20,6 +21,8 @@ type Store struct { // key-value store structure Tree *tree + mutex sync.Mutex + // WatcherHub is where we register all the clients // who issue a watch request watcher *WatcherHub @@ -136,9 +139,16 @@ func (s *Store) SetMessager(messager *chan string) { s.messager = messager } -// Set the key to value with expiration time func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.internalSet(key, value, expireTime, index) + +} + +// Set the key to value with expiration time +func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { //Update index s.Index = index @@ -161,7 +171,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64 // the key may be expired, we should not add the node // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { - return s.Delete(key, index) + return s.internalDelete(key, index) } var TTL int64 @@ -290,6 +300,9 @@ func (s *Store) internalGet(key string) *Response { // If key is a file return the file // If key is a directory reuturn an array of files func (s *Store) Get(key string) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + resps, err := s.RawGet(key) if err != nil { @@ -312,28 +325,49 @@ func (s *Store) RawGet(key string) ([]*Response, error) { nodes, keys, dirs, ok := s.Tree.list(key) if ok { + + node, ok := nodes.(*Node) + + if ok { + resps := make([]*Response, 1) + + isExpire := !node.ExpireTime.Equal(PERMANENT) + + resps[0] = &Response{ + Action: "GET", + Index: s.Index, + Key: key, + Value: node.Value, + } + + // Update ttl + if isExpire { + TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second) + resps[0].Expiration = &node.ExpireTime + resps[0].TTL = TTL + } + + return resps, nil + } + + + nodes, _ := nodes.([]*Node) + resps := make([]*Response, len(nodes)) for i := 0; i < len(nodes); i++ { var TTL int64 var isExpire bool = false - var thisKey string isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) - if keys != nil { - thisKey = path.Join(key, keys[i]) - } else { - thisKey = key - } - resps[i] = &Response{ Action: "GET", Index: s.Index, - Key: thisKey, + Key: path.Join(key, keys[i]), } - if dirs == nil || !dirs[i] { + if !dirs[i] { resps[i].Value = nodes[i].Value } else { resps[i].Dir = true @@ -355,8 +389,14 @@ func (s *Store) RawGet(key string) ([]*Response, error) { return nil, err } -// Delete the key func (s *Store) Delete(key string, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.internalDelete(key, index) +} + +// Delete the key +func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { // Update stats s.BasicStats.Deletes++ @@ -411,6 +451,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) { // Set the value of the key to the value if the given prevValue is equal to the value of the key func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + // Update stats s.BasicStats.TestAndSets++ @@ -424,7 +467,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim if resp.Value == prevValue { // If test success, do set - return s.Set(key, value, expireTime, index) + return s.internalSet(key, value, expireTime, index) } else { // If fails, return err @@ -459,6 +502,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime return } else { + s.mutex.Lock() s.Tree.delete(key) @@ -469,6 +513,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime Expiration: &node.ExpireTime, Index: s.Index, } + s.mutex.Unlock() msg, err := json.Marshal(resp) @@ -527,9 +572,32 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) { } } +func (s *Store) clone() *Store { + newStore := & Store{ + ResponseMaxSize: s.ResponseMaxSize, + ResponseCurrSize: s.ResponseCurrSize, + ResponseStartIndex: s.ResponseStartIndex, + Index: s.Index, + BasicStats: s.BasicStats, + } + + newStore.Tree = s.Tree.clone() + newStore.ResponseMap = make(map[string]*Response) + + for index, response := range s.ResponseMap { + newStore.ResponseMap[index] = response + } + + return newStore +} + // Save the current state of the storage system func (s *Store) Save() ([]byte, error) { - b, err := json.Marshal(s) + s.mutex.Lock() + cloneStore := s.clone() + s.mutex.Unlock() + + b, err := json.Marshal(cloneStore) if err != nil { fmt.Println(err) return nil, err @@ -539,7 +607,8 @@ func (s *Store) Save() ([]byte, error) { // Recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { - + s.mutex.Lock() + defer s.mutex.Unlock() // we need to stop all the current watchers // recovery will clear watcherHub s.watcher.stopWatchers() diff --git a/store/store_test.go b/store/store_test.go index d28c57406..875750c82 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -131,7 +131,7 @@ func TestExpire(t *testing.T) { } -func BenchmarkSet(b *testing.B) { +func BenchmarkStoreSet(b *testing.B) { s := CreateStore(100) keys := GenKeys(10000, 5) @@ -147,7 +147,7 @@ func BenchmarkSet(b *testing.B) { } } -func BenchmarkGet(b *testing.B) { +func BenchmarkStoreGet(b *testing.B) { s := CreateStore(100) keys := GenKeys(100, 5) @@ -166,11 +166,25 @@ func BenchmarkGet(b *testing.B) { } } -func BenchmarkSetAndGet(b *testing.B) { +func BenchmarkStoreSnapshotCopy(b *testing.B) { + s := CreateStore(100) + keys := GenKeys(10000, 5) + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + var state []byte + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.clone() + } + b.SetBytes(int64(len(state))) } -func BenchmarkSnapshotSave(b *testing.B) { +func BenchmarkSnapshotSaveJson(b *testing.B) { s := CreateStore(100) keys := GenKeys(10000, 5) diff --git a/store/test.go b/store/test.go new file mode 100644 index 000000000..a725e87db --- /dev/null +++ b/store/test.go @@ -0,0 +1,21 @@ +package store + +import ( + "math/rand" + "strconv" +) + +// GenKeys randomly generate num of keys with max depth +func GenKeys(num int, depth int) []string { + keys := make([]string, num) + for i := 0; i < num; i++ { + + keys[i] = "/foo/" + depth := rand.Intn(depth) + + for j := 0; j < depth; j++ { + keys[i] += "/" + strconv.Itoa(rand.Int()) + } + } + return keys +} diff --git a/store/tree.go b/store/tree.go index ef47d3dde..5d136bdfa 100644 --- a/store/tree.go +++ b/store/tree.go @@ -4,6 +4,7 @@ import ( "path" "sort" "strings" + "time" ) //------------------------------------------------------------------------------ @@ -158,25 +159,23 @@ func (t *tree) get(key string) (Node, bool) { } // get the internalNode of the key -func (t *tree) list(directory string) ([]Node, []string, []bool, bool) { +func (t *tree) list(directory string) (interface{}, []string, []bool, bool) { treeNode, ok := t.internalGet(directory) if !ok { return nil, nil, nil, ok } else { if !treeNode.Dir { - nodes := make([]Node, 1) - nodes[0] = treeNode.InternalNode - return nodes, nil, nil, true + return &treeNode.InternalNode, nil, nil, true } length := len(treeNode.NodeMap) - nodes := make([]Node, length) + nodes := make([]*Node, length) keys := make([]string, length) dirs := make([]bool, length) i := 0 for key, node := range treeNode.NodeMap { - nodes[i] = node.InternalNode + nodes[i] = &node.InternalNode keys[i] = key if node.Dir { dirs[i] = true @@ -223,6 +222,42 @@ func (t *tree) traverse(f func(string, *Node), sort bool) { } } +// clone() will return a deep cloned tree +func (t *tree) clone() *tree { + newTree := new(tree) + newTree.Root = &treeNode{ + Node{ + "/", + time.Unix(0, 0), + nil, + }, + true, + make(map[string]*treeNode), + } + recursiveClone(t.Root, newTree.Root) + return newTree +} + +// recursiveClone is a helper function for clone() +func recursiveClone(tnSrc *treeNode, tnDes *treeNode) { + if !tnSrc.Dir { + tnDes.InternalNode = tnSrc.InternalNode + return + + } else { + tnDes.InternalNode = tnSrc.InternalNode + tnDes.Dir = true + tnDes.NodeMap = make(map[string]*treeNode) + + for key, tn := range tnSrc.NodeMap { + newTn := new(treeNode) + recursiveClone(tn, newTn) + tnDes.NodeMap[key] = newTn + } + + } +} + // deep first search to traverse the tree // apply the func f to each internal node func dfs(key string, t *treeNode, f func(string, *Node)) { @@ -281,3 +316,4 @@ func split(key string) []string { nodesName = nodesName[1:] return nodesName } + diff --git a/store/tree_store_test.go b/store/tree_store_test.go index f23bf3845..fe5a71589 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -12,18 +12,18 @@ func TestStoreGet(t *testing.T) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, } // create key - ts.set("/foo", CreateTestNode("bar")) + ts.set("/foo", NewTestNode("bar")) // change value - ts.set("/foo", CreateTestNode("barbar")) + ts.set("/foo", NewTestNode("barbar")) // create key - ts.set("/hello/foo", CreateTestNode("barbarbar")) + ts.set("/hello/foo", NewTestNode("barbarbar")) treeNode, ok := ts.get("/foo") if !ok { @@ -43,7 +43,7 @@ func TestStoreGet(t *testing.T) { } // create a key under other key - ok = ts.set("/foo/foo", CreateTestNode("bar")) + ok = ts.set("/foo/foo", NewTestNode("bar")) if ok { t.Fatalf("shoud not add key under a exisiting key") } @@ -61,14 +61,15 @@ func TestStoreGet(t *testing.T) { } // test list - ts.set("/hello/fooo", CreateTestNode("barbarbar")) - ts.set("/hello/foooo/foo", CreateTestNode("barbarbar")) + ts.set("/hello/fooo", NewTestNode("barbarbar")) + ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) nodes, keys, dirs, ok := ts.list("/hello") if !ok { t.Fatalf("cannot list!") } else { + nodes, _ := nodes.([]*Node) length := len(nodes) for i := 0; i < length; i++ { @@ -80,7 +81,7 @@ func TestStoreGet(t *testing.T) { for i := 0; i < 100; i++ { value := strconv.Itoa(rand.Int()) - ts.set(keys[i], CreateTestNode(value)) + ts.set(keys[i], NewTestNode(value)) treeNode, ok := ts.get(keys[i]) if !ok { @@ -94,6 +95,48 @@ func TestStoreGet(t *testing.T) { ts.traverse(f, true) } +func TestTreeClone(t *testing.T) { + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + backTs := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + // generate the first tree + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, NewTestNode(value)) + backTs.set(key, NewTestNode(value)) + } + + copyTs := ts.clone() + + // test if they are identical + copyTs.traverse(ts.contain, false) + + // remove all the keys from first tree + for _, key := range keys { + ts.delete(key) + } + + // test if they are identical + // make sure changes in the first tree will affect the copy one + copyTs.traverse(backTs.contain, false) + +} + func BenchmarkTreeStoreSet(b *testing.B) { keys := GenKeys(10000, 10) @@ -103,7 +146,7 @@ func BenchmarkTreeStoreSet(b *testing.B) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -111,7 +154,7 @@ func BenchmarkTreeStoreSet(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) } } } @@ -122,7 +165,7 @@ func BenchmarkTreeStoreGet(b *testing.B) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -130,7 +173,7 @@ func BenchmarkTreeStoreGet(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) } b.ResetTimer() @@ -141,13 +184,12 @@ func BenchmarkTreeStoreGet(b *testing.B) { } } -func BenchmarkTreeStoreList(b *testing.B) { - +func BenchmarkTreeStoreCopy(b *testing.B) { keys := GenKeys(10000, 10) ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -155,7 +197,40 @@ func BenchmarkTreeStoreList(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ts.clone() + } +} + + +func BenchmarkMakeSlice(b *testing.B) { + + for i := 0; i < b.N; i++ { + for i:=0; i < 100000; i++ { + _ = make([]Node, 1) + } + } +} + +func BenchmarkTreeStoreList(b *testing.B) { + + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, NewTestNode(value)) } b.ResetTimer() @@ -166,10 +241,17 @@ func BenchmarkTreeStoreList(b *testing.B) { } } -func f(key string, n *Node) { - fmt.Println(key, "=", n.Value) +func (t *tree) contain(key string, node *Node) { + _, ok := t.get(key) + if !ok { + panic("tree do not contain the given key") + } } -func CreateTestNode(value string) Node { +func f(key string, n *Node) { + return +} + +func NewTestNode(value string) Node { return Node{value, time.Unix(0, 0), nil} }