From 39fa2eb4ee4cbb7b334a0754b90adcf06f2212dc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 13:21:08 -0700 Subject: [PATCH 1/9] add benchmarks --- store/store.go | 11 +++++- store/store_test.go | 80 +++++++++++++++++++++++++++++++++++-- store/tree.go | 2 +- store/tree_store_test.go | 85 +++++++++++++++++++++++++++++++++++----- store/watcher_test.go | 15 +------ 5 files changed, 164 insertions(+), 29 deletions(-) diff --git a/store/store.go b/store/store.go index ae6a759f2..8132019ce 100644 --- a/store/store.go +++ b/store/store.go @@ -317,16 +317,23 @@ func (s *Store) RawGet(key string) ([]*Response, error) { var TTL int64 var isExpire bool = false + var thisKey string isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) + if keys != nil { + thisKey = path.Join(key, keys[i]) + } else { + thisKey = key + } + resps[i] = &Response{ Action: "GET", Index: s.Index, - Key: path.Join(key, keys[i]), + Key: thisKey, } - if !dirs[i] { + if dirs == nil || !dirs[i] { resps[i].Value = nodes[i].Value } else { resps[i].Dir = true diff --git a/store/store_test.go b/store/store_test.go index aae930ca2..d28c57406 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,7 +2,6 @@ package store import ( "encoding/json" - "fmt" "testing" "time" ) @@ -70,9 +69,6 @@ func TestSaveAndRecovery(t *testing.T) { } func TestExpire(t *testing.T) { - fmt.Println(time.Now()) - fmt.Println("TEST EXPIRE") - // test expire s := CreateStore(100) s.Set("foo", "bar", time.Now().Add(time.Second*1), 0) @@ -134,3 +130,79 @@ func TestExpire(t *testing.T) { } } + +func BenchmarkSet(b *testing.B) { + s := CreateStore(100) + + keys := GenKeys(10000, 5) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + s = CreateStore(100) + } +} + +func BenchmarkGet(b *testing.B) { + s := CreateStore(100) + + keys := GenKeys(100, 5) + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + + for _, key := range keys { + s.Get(key) + } + + } +} + +func BenchmarkSetAndGet(b *testing.B) { + +} + +func BenchmarkSnapshotSave(b *testing.B) { + s := CreateStore(100) + + keys := GenKeys(10000, 5) + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + var state []byte + + b.ResetTimer() + for i := 0; i < b.N; i++ { + state, _ = s.Save() + } + b.SetBytes(int64(len(state))) +} + +func BenchmarkSnapshotRecovery(b *testing.B) { + s := CreateStore(100) + + keys := GenKeys(10000, 5) + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + state, _ := s.Save() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + newStore := CreateStore(100) + newStore.Recovery(state) + } + b.SetBytes(int64(len(state))) +} diff --git a/store/tree.go b/store/tree.go index 3192d55bc..ef47d3dde 100644 --- a/store/tree.go +++ b/store/tree.go @@ -167,7 +167,7 @@ func (t *tree) list(directory string) ([]Node, []string, []bool, bool) { if !treeNode.Dir { nodes := make([]Node, 1) nodes[0] = treeNode.InternalNode - return nodes, make([]string, 1), make([]bool, 1), true + return nodes, nil, nil, true } length := len(treeNode.NodeMap) nodes := make([]Node, length) diff --git a/store/tree_store_test.go b/store/tree_store_test.go index 6faa5c11f..f23bf3845 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -76,20 +76,15 @@ func TestStoreGet(t *testing.T) { } } - // speed test + keys = GenKeys(100, 10) + for i := 0; i < 100; i++ { - key := "/" - depth := rand.Intn(10) - for j := 0; j < depth; j++ { - key += "/" + strconv.Itoa(rand.Int()%10) - } value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) - treeNode, ok := ts.get(key) + ts.set(keys[i], CreateTestNode(value)) + treeNode, ok := ts.get(keys[i]) if !ok { continue - //t.Fatalf("Expect to get node, but not") } if treeNode.Value != value { t.Fatalf("Expect value %s, but got %s", value, treeNode.Value) @@ -99,6 +94,78 @@ func TestStoreGet(t *testing.T) { ts.traverse(f, true) } +func BenchmarkTreeStoreSet(b *testing.B) { + + keys := GenKeys(10000, 10) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + + ts := &tree{ + &treeNode{ + CreateTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, CreateTestNode(value)) + } + } +} + +func BenchmarkTreeStoreGet(b *testing.B) { + + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + CreateTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, CreateTestNode(value)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, key := range keys { + ts.get(key) + } + } +} + +func BenchmarkTreeStoreList(b *testing.B) { + + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + CreateTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, CreateTestNode(value)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, key := range keys { + ts.list(key) + } + } +} + func f(key string, n *Node) { fmt.Println(key, "=", n.Value) } diff --git a/store/watcher_test.go b/store/watcher_test.go index 389d4643f..08e64d1ab 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -1,8 +1,6 @@ package store import ( - "math/rand" - "strconv" "testing" "time" ) @@ -62,16 +60,7 @@ func TestWatch(t *testing.T) { func BenchmarkWatch(b *testing.B) { s := CreateStore(100) - key := make([]string, 10000) - for i := 0; i < 10000; i++ { - - key[i] = "/foo/" - depth := rand.Intn(10) - - for j := 0; j < depth; j++ { - key[i] += "/" + strconv.Itoa(rand.Int()%10) - } - } + keys := GenKeys(10000, 10) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -80,7 +69,7 @@ func BenchmarkWatch(b *testing.B) { // create a new watcher watchers[i] = NewWatcher() // add to the watchers list - s.AddWatcher(key[i], watchers[i], 0) + s.AddWatcher(keys[i], watchers[i], 0) } s.watcher.stopWatchers() From 63ba16f51adcf6e7426abb34c49c6defc5ecc37f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 16:05:49 -0700 Subject: [PATCH 2/9] add clone func to help snapshot. add benchmarks --- store/keyword_test.go | 8 +-- store/store.go | 99 +++++++++++++++++++++++++++----- store/store_test.go | 22 +++++-- store/test.go | 21 +++++++ store/tree.go | 48 ++++++++++++++-- store/tree_store_test.go | 120 ++++++++++++++++++++++++++++++++------- 6 files changed, 270 insertions(+), 48 deletions(-) create mode 100644 store/test.go diff --git a/store/keyword_test.go b/store/keyword_test.go index f2d15565d..7c54a9fde 100644 --- a/store/keyword_test.go +++ b/store/keyword_test.go @@ -7,25 +7,25 @@ import ( func TestKeywords(t *testing.T) { keyword := CheckKeyword("_etcd") if !keyword { - t.Fatal("machines should be keyword") + t.Fatal("_etcd should be keyword") } keyword = CheckKeyword("/_etcd") if !keyword { - t.Fatal("/machines should be keyword") + t.Fatal("/_etcd should be keyword") } keyword = CheckKeyword("/_etcd/") if !keyword { - t.Fatal("/machines/ contains keyword prefix") + t.Fatal("/_etcd/ contains keyword prefix") } keyword = CheckKeyword("/_etcd/node1") if !keyword { - t.Fatal("/machines/* contains keyword prefix") + t.Fatal("/_etcd/* contains keyword prefix") } keyword = CheckKeyword("/nokeyword/_etcd/node1") diff --git a/store/store.go b/store/store.go index 8132019ce..e850ff592 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ import ( "path" "strconv" "time" + "sync" ) //------------------------------------------------------------------------------ @@ -20,6 +21,8 @@ type Store struct { // key-value store structure Tree *tree + mutex sync.Mutex + // WatcherHub is where we register all the clients // who issue a watch request watcher *WatcherHub @@ -136,9 +139,16 @@ func (s *Store) SetMessager(messager *chan string) { s.messager = messager } -// Set the key to value with expiration time func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.internalSet(key, value, expireTime, index) + +} + +// Set the key to value with expiration time +func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { //Update index s.Index = index @@ -161,7 +171,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64 // the key may be expired, we should not add the node // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { - return s.Delete(key, index) + return s.internalDelete(key, index) } var TTL int64 @@ -290,6 +300,9 @@ func (s *Store) internalGet(key string) *Response { // If key is a file return the file // If key is a directory reuturn an array of files func (s *Store) Get(key string) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + resps, err := s.RawGet(key) if err != nil { @@ -312,28 +325,49 @@ func (s *Store) RawGet(key string) ([]*Response, error) { nodes, keys, dirs, ok := s.Tree.list(key) if ok { + + node, ok := nodes.(*Node) + + if ok { + resps := make([]*Response, 1) + + isExpire := !node.ExpireTime.Equal(PERMANENT) + + resps[0] = &Response{ + Action: "GET", + Index: s.Index, + Key: key, + Value: node.Value, + } + + // Update ttl + if isExpire { + TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second) + resps[0].Expiration = &node.ExpireTime + resps[0].TTL = TTL + } + + return resps, nil + } + + + nodes, _ := nodes.([]*Node) + resps := make([]*Response, len(nodes)) for i := 0; i < len(nodes); i++ { var TTL int64 var isExpire bool = false - var thisKey string isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) - if keys != nil { - thisKey = path.Join(key, keys[i]) - } else { - thisKey = key - } - resps[i] = &Response{ Action: "GET", Index: s.Index, - Key: thisKey, + Key: path.Join(key, keys[i]), } - if dirs == nil || !dirs[i] { + if !dirs[i] { resps[i].Value = nodes[i].Value } else { resps[i].Dir = true @@ -355,8 +389,14 @@ func (s *Store) RawGet(key string) ([]*Response, error) { return nil, err } -// Delete the key func (s *Store) Delete(key string, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.internalDelete(key, index) +} + +// Delete the key +func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { // Update stats s.BasicStats.Deletes++ @@ -411,6 +451,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) { // Set the value of the key to the value if the given prevValue is equal to the value of the key func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + // Update stats s.BasicStats.TestAndSets++ @@ -424,7 +467,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim if resp.Value == prevValue { // If test success, do set - return s.Set(key, value, expireTime, index) + return s.internalSet(key, value, expireTime, index) } else { // If fails, return err @@ -459,6 +502,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime return } else { + s.mutex.Lock() s.Tree.delete(key) @@ -469,6 +513,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime Expiration: &node.ExpireTime, Index: s.Index, } + s.mutex.Unlock() msg, err := json.Marshal(resp) @@ -527,9 +572,32 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) { } } +func (s *Store) clone() *Store { + newStore := & Store{ + ResponseMaxSize: s.ResponseMaxSize, + ResponseCurrSize: s.ResponseCurrSize, + ResponseStartIndex: s.ResponseStartIndex, + Index: s.Index, + BasicStats: s.BasicStats, + } + + newStore.Tree = s.Tree.clone() + newStore.ResponseMap = make(map[string]*Response) + + for index, response := range s.ResponseMap { + newStore.ResponseMap[index] = response + } + + return newStore +} + // Save the current state of the storage system func (s *Store) Save() ([]byte, error) { - b, err := json.Marshal(s) + s.mutex.Lock() + cloneStore := s.clone() + s.mutex.Unlock() + + b, err := json.Marshal(cloneStore) if err != nil { fmt.Println(err) return nil, err @@ -539,7 +607,8 @@ func (s *Store) Save() ([]byte, error) { // Recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { - + s.mutex.Lock() + defer s.mutex.Unlock() // we need to stop all the current watchers // recovery will clear watcherHub s.watcher.stopWatchers() diff --git a/store/store_test.go b/store/store_test.go index d28c57406..875750c82 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -131,7 +131,7 @@ func TestExpire(t *testing.T) { } -func BenchmarkSet(b *testing.B) { +func BenchmarkStoreSet(b *testing.B) { s := CreateStore(100) keys := GenKeys(10000, 5) @@ -147,7 +147,7 @@ func BenchmarkSet(b *testing.B) { } } -func BenchmarkGet(b *testing.B) { +func BenchmarkStoreGet(b *testing.B) { s := CreateStore(100) keys := GenKeys(100, 5) @@ -166,11 +166,25 @@ func BenchmarkGet(b *testing.B) { } } -func BenchmarkSetAndGet(b *testing.B) { +func BenchmarkStoreSnapshotCopy(b *testing.B) { + s := CreateStore(100) + keys := GenKeys(10000, 5) + + for i, key := range keys { + s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) + } + + var state []byte + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.clone() + } + b.SetBytes(int64(len(state))) } -func BenchmarkSnapshotSave(b *testing.B) { +func BenchmarkSnapshotSaveJson(b *testing.B) { s := CreateStore(100) keys := GenKeys(10000, 5) diff --git a/store/test.go b/store/test.go new file mode 100644 index 000000000..a725e87db --- /dev/null +++ b/store/test.go @@ -0,0 +1,21 @@ +package store + +import ( + "math/rand" + "strconv" +) + +// GenKeys randomly generate num of keys with max depth +func GenKeys(num int, depth int) []string { + keys := make([]string, num) + for i := 0; i < num; i++ { + + keys[i] = "/foo/" + depth := rand.Intn(depth) + + for j := 0; j < depth; j++ { + keys[i] += "/" + strconv.Itoa(rand.Int()) + } + } + return keys +} diff --git a/store/tree.go b/store/tree.go index ef47d3dde..5d136bdfa 100644 --- a/store/tree.go +++ b/store/tree.go @@ -4,6 +4,7 @@ import ( "path" "sort" "strings" + "time" ) //------------------------------------------------------------------------------ @@ -158,25 +159,23 @@ func (t *tree) get(key string) (Node, bool) { } // get the internalNode of the key -func (t *tree) list(directory string) ([]Node, []string, []bool, bool) { +func (t *tree) list(directory string) (interface{}, []string, []bool, bool) { treeNode, ok := t.internalGet(directory) if !ok { return nil, nil, nil, ok } else { if !treeNode.Dir { - nodes := make([]Node, 1) - nodes[0] = treeNode.InternalNode - return nodes, nil, nil, true + return &treeNode.InternalNode, nil, nil, true } length := len(treeNode.NodeMap) - nodes := make([]Node, length) + nodes := make([]*Node, length) keys := make([]string, length) dirs := make([]bool, length) i := 0 for key, node := range treeNode.NodeMap { - nodes[i] = node.InternalNode + nodes[i] = &node.InternalNode keys[i] = key if node.Dir { dirs[i] = true @@ -223,6 +222,42 @@ func (t *tree) traverse(f func(string, *Node), sort bool) { } } +// clone() will return a deep cloned tree +func (t *tree) clone() *tree { + newTree := new(tree) + newTree.Root = &treeNode{ + Node{ + "/", + time.Unix(0, 0), + nil, + }, + true, + make(map[string]*treeNode), + } + recursiveClone(t.Root, newTree.Root) + return newTree +} + +// recursiveClone is a helper function for clone() +func recursiveClone(tnSrc *treeNode, tnDes *treeNode) { + if !tnSrc.Dir { + tnDes.InternalNode = tnSrc.InternalNode + return + + } else { + tnDes.InternalNode = tnSrc.InternalNode + tnDes.Dir = true + tnDes.NodeMap = make(map[string]*treeNode) + + for key, tn := range tnSrc.NodeMap { + newTn := new(treeNode) + recursiveClone(tn, newTn) + tnDes.NodeMap[key] = newTn + } + + } +} + // deep first search to traverse the tree // apply the func f to each internal node func dfs(key string, t *treeNode, f func(string, *Node)) { @@ -281,3 +316,4 @@ func split(key string) []string { nodesName = nodesName[1:] return nodesName } + diff --git a/store/tree_store_test.go b/store/tree_store_test.go index f23bf3845..fe5a71589 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -12,18 +12,18 @@ func TestStoreGet(t *testing.T) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, } // create key - ts.set("/foo", CreateTestNode("bar")) + ts.set("/foo", NewTestNode("bar")) // change value - ts.set("/foo", CreateTestNode("barbar")) + ts.set("/foo", NewTestNode("barbar")) // create key - ts.set("/hello/foo", CreateTestNode("barbarbar")) + ts.set("/hello/foo", NewTestNode("barbarbar")) treeNode, ok := ts.get("/foo") if !ok { @@ -43,7 +43,7 @@ func TestStoreGet(t *testing.T) { } // create a key under other key - ok = ts.set("/foo/foo", CreateTestNode("bar")) + ok = ts.set("/foo/foo", NewTestNode("bar")) if ok { t.Fatalf("shoud not add key under a exisiting key") } @@ -61,14 +61,15 @@ func TestStoreGet(t *testing.T) { } // test list - ts.set("/hello/fooo", CreateTestNode("barbarbar")) - ts.set("/hello/foooo/foo", CreateTestNode("barbarbar")) + ts.set("/hello/fooo", NewTestNode("barbarbar")) + ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) nodes, keys, dirs, ok := ts.list("/hello") if !ok { t.Fatalf("cannot list!") } else { + nodes, _ := nodes.([]*Node) length := len(nodes) for i := 0; i < length; i++ { @@ -80,7 +81,7 @@ func TestStoreGet(t *testing.T) { for i := 0; i < 100; i++ { value := strconv.Itoa(rand.Int()) - ts.set(keys[i], CreateTestNode(value)) + ts.set(keys[i], NewTestNode(value)) treeNode, ok := ts.get(keys[i]) if !ok { @@ -94,6 +95,48 @@ func TestStoreGet(t *testing.T) { ts.traverse(f, true) } +func TestTreeClone(t *testing.T) { + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + backTs := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + // generate the first tree + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, NewTestNode(value)) + backTs.set(key, NewTestNode(value)) + } + + copyTs := ts.clone() + + // test if they are identical + copyTs.traverse(ts.contain, false) + + // remove all the keys from first tree + for _, key := range keys { + ts.delete(key) + } + + // test if they are identical + // make sure changes in the first tree will affect the copy one + copyTs.traverse(backTs.contain, false) + +} + func BenchmarkTreeStoreSet(b *testing.B) { keys := GenKeys(10000, 10) @@ -103,7 +146,7 @@ func BenchmarkTreeStoreSet(b *testing.B) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -111,7 +154,7 @@ func BenchmarkTreeStoreSet(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) } } } @@ -122,7 +165,7 @@ func BenchmarkTreeStoreGet(b *testing.B) { ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -130,7 +173,7 @@ func BenchmarkTreeStoreGet(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) } b.ResetTimer() @@ -141,13 +184,12 @@ func BenchmarkTreeStoreGet(b *testing.B) { } } -func BenchmarkTreeStoreList(b *testing.B) { - +func BenchmarkTreeStoreCopy(b *testing.B) { keys := GenKeys(10000, 10) ts := &tree{ &treeNode{ - CreateTestNode("/"), + NewTestNode("/"), true, make(map[string]*treeNode), }, @@ -155,7 +197,40 @@ func BenchmarkTreeStoreList(b *testing.B) { for _, key := range keys { value := strconv.Itoa(rand.Int()) - ts.set(key, CreateTestNode(value)) + ts.set(key, NewTestNode(value)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ts.clone() + } +} + + +func BenchmarkMakeSlice(b *testing.B) { + + for i := 0; i < b.N; i++ { + for i:=0; i < 100000; i++ { + _ = make([]Node, 1) + } + } +} + +func BenchmarkTreeStoreList(b *testing.B) { + + keys := GenKeys(10000, 10) + + ts := &tree{ + &treeNode{ + NewTestNode("/"), + true, + make(map[string]*treeNode), + }, + } + + for _, key := range keys { + value := strconv.Itoa(rand.Int()) + ts.set(key, NewTestNode(value)) } b.ResetTimer() @@ -166,10 +241,17 @@ func BenchmarkTreeStoreList(b *testing.B) { } } -func f(key string, n *Node) { - fmt.Println(key, "=", n.Value) +func (t *tree) contain(key string, node *Node) { + _, ok := t.get(key) + if !ok { + panic("tree do not contain the given key") + } } -func CreateTestNode(value string) Node { +func f(key string, n *Node) { + return +} + +func NewTestNode(value string) Node { return Node{value, time.Unix(0, 0), nil} } From c6a43213102073415edcdfb2dc936e46588032ed Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 16:06:23 -0700 Subject: [PATCH 3/9] gofmt --- store/store.go | 17 ++++++++--------- store/tree.go | 5 ++--- store/tree_store_test.go | 11 +++++------ 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/store/store.go b/store/store.go index e850ff592..f5cd7a8ae 100644 --- a/store/store.go +++ b/store/store.go @@ -5,8 +5,8 @@ import ( "fmt" "path" "strconv" - "time" "sync" + "time" ) //------------------------------------------------------------------------------ @@ -21,7 +21,7 @@ type Store struct { // key-value store structure Tree *tree - mutex sync.Mutex + mutex sync.Mutex // WatcherHub is where we register all the clients // who issue a watch request @@ -350,7 +350,6 @@ func (s *Store) RawGet(key string) ([]*Response, error) { return resps, nil } - nodes, _ := nodes.([]*Node) resps := make([]*Response, len(nodes)) @@ -573,12 +572,12 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) { } func (s *Store) clone() *Store { - newStore := & Store{ - ResponseMaxSize: s.ResponseMaxSize, - ResponseCurrSize: s.ResponseCurrSize, - ResponseStartIndex: s.ResponseStartIndex, - Index: s.Index, - BasicStats: s.BasicStats, + newStore := &Store{ + ResponseMaxSize: s.ResponseMaxSize, + ResponseCurrSize: s.ResponseCurrSize, + ResponseStartIndex: s.ResponseStartIndex, + Index: s.Index, + BasicStats: s.BasicStats, } newStore.Tree = s.Tree.clone() diff --git a/store/tree.go b/store/tree.go index 5d136bdfa..4dd124869 100644 --- a/store/tree.go +++ b/store/tree.go @@ -230,10 +230,10 @@ func (t *tree) clone() *tree { "/", time.Unix(0, 0), nil, - }, + }, true, make(map[string]*treeNode), - } + } recursiveClone(t.Root, newTree.Root) return newTree } @@ -316,4 +316,3 @@ func split(key string) []string { nodesName = nodesName[1:] return nodesName } - diff --git a/store/tree_store_test.go b/store/tree_store_test.go index fe5a71589..3c17e7d91 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -103,16 +103,16 @@ func TestTreeClone(t *testing.T) { NewTestNode("/"), true, make(map[string]*treeNode), - }, - } + }, + } backTs := &tree{ &treeNode{ NewTestNode("/"), true, make(map[string]*treeNode), - }, - } + }, + } // generate the first tree for _, key := range keys { @@ -206,11 +206,10 @@ func BenchmarkTreeStoreCopy(b *testing.B) { } } - func BenchmarkMakeSlice(b *testing.B) { for i := 0; i < b.N; i++ { - for i:=0; i < 100000; i++ { + for i := 0; i < 100000; i++ { _ = make([]Node, 1) } } From 793d5187a56a459591a4c71cd1d71cb505ca5ed0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 17:02:27 -0700 Subject: [PATCH 4/9] fix wrong join redirection --- etcd.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/etcd.go b/etcd.go index fc225b8db..f24ed6306 100644 --- a/etcd.go +++ b/etcd.go @@ -625,10 +625,9 @@ func joinCluster(s *raft.Server, serverName string) error { } if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") - debugf("Leader is %s", address) debugf("Send Join Request to %s", address) json.NewEncoder(&b).Encode(command) - resp, err = t.Post(fmt.Sprintf("%s/join", address), &b) + resp, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") return fmt.Errorf(errors[103]) From 09cfd892985e2ed5ed411d8a4254edcb50d833ec Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 17:03:15 -0700 Subject: [PATCH 5/9] better error name for watcher --- command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command.go b/command.go index 3ad371819..8674ec9a9 100644 --- a/command.go +++ b/command.go @@ -102,7 +102,7 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { res := <-watcher.C if res == nil { - return nil, fmt.Errorf("watcher is cleared") + return nil, fmt.Errorf("Clearing watch") } return json.Marshal(res) From bebcf4b73309f5b38963bb20796568b414138fa6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 17:03:31 -0700 Subject: [PATCH 6/9] simplify tree list --- store/store.go | 6 ++++-- store/tree.go | 19 +++++++------------ store/tree_store_test.go | 4 ++-- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/store/store.go b/store/store.go index f5cd7a8ae..00ea1b926 100644 --- a/store/store.go +++ b/store/store.go @@ -322,7 +322,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) { key = path.Clean("/" + key) - nodes, keys, dirs, ok := s.Tree.list(key) + nodes, keys, ok := s.Tree.list(key) if ok { @@ -366,7 +366,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) { Key: path.Join(key, keys[i]), } - if !dirs[i] { + if len(nodes[i].Value) != 0 { resps[i].Value = nodes[i].Value } else { resps[i].Dir = true @@ -592,6 +592,8 @@ func (s *Store) clone() *Store { // Save the current state of the storage system func (s *Store) Save() ([]byte, error) { + // first we clone the store + // json is very slow, we cannot hold the lock for such a long time s.mutex.Lock() cloneStore := s.clone() s.mutex.Unlock() diff --git a/store/tree.go b/store/tree.go index 4dd124869..11b8092b2 100644 --- a/store/tree.go +++ b/store/tree.go @@ -42,7 +42,7 @@ func (s tnWithKeySlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // CONSTANT VARIABLE // Represent an empty node -var emptyNode = Node{".", PERMANENT, nil} +var emptyNode = Node{"", PERMANENT, nil} //------------------------------------------------------------------------------ // @@ -159,33 +159,28 @@ func (t *tree) get(key string) (Node, bool) { } // get the internalNode of the key -func (t *tree) list(directory string) (interface{}, []string, []bool, bool) { +func (t *tree) list(directory string) (interface{}, []string, bool) { treeNode, ok := t.internalGet(directory) if !ok { - return nil, nil, nil, ok + return nil, nil, ok + } else { if !treeNode.Dir { - return &treeNode.InternalNode, nil, nil, true + return &treeNode.InternalNode, nil, ok } length := len(treeNode.NodeMap) nodes := make([]*Node, length) keys := make([]string, length) - dirs := make([]bool, length) - i := 0 + i := 0 for key, node := range treeNode.NodeMap { nodes[i] = &node.InternalNode keys[i] = key - if node.Dir { - dirs[i] = true - } else { - dirs[i] = false - } i++ } - return nodes, keys, dirs, ok + return nodes, keys, ok } } diff --git a/store/tree_store_test.go b/store/tree_store_test.go index 3c17e7d91..385989e37 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -64,7 +64,7 @@ func TestStoreGet(t *testing.T) { ts.set("/hello/fooo", NewTestNode("barbarbar")) ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) - nodes, keys, dirs, ok := ts.list("/hello") + nodes, keys, ok := ts.list("/hello") if !ok { t.Fatalf("cannot list!") @@ -73,7 +73,7 @@ func TestStoreGet(t *testing.T) { length := len(nodes) for i := 0; i < length; i++ { - fmt.Println(keys[i], "=", nodes[i].Value, "[", dirs[i], "]") + fmt.Println(keys[i], "=", nodes[i].Value) } } From d239e5e0ddf86b1435cf3e78260f5730028fcefc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 17:29:01 -0700 Subject: [PATCH 7/9] fix join redirection --- etcd.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/etcd.go b/etcd.go index f24ed6306..12901f91e 100644 --- a/etcd.go +++ b/etcd.go @@ -627,7 +627,12 @@ func joinCluster(s *raft.Server, serverName string) error { address := resp.Header.Get("Location") debugf("Send Join Request to %s", address) json.NewEncoder(&b).Encode(command) - resp, err = t.Post(address, &b) + segs := strings.Split(address, "://") + if len(segs) != 2 { + return fmt.Errorf("Unable to join: wrong redirection info") + } + path := segs[1] + resp, err = t.Post(path, &b) } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") return fmt.Errorf(errors[103]) From 65c34457de5da2657cad664036db57b7f0e97788 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 5 Aug 2013 21:28:42 -0700 Subject: [PATCH 8/9] fix test file --- store/store_test.go | 2 +- store/test.go | 2 +- store/tree_store_test.go | 33 ++++++++++++--------------------- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index 875750c82..001f5ddd5 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -150,7 +150,7 @@ func BenchmarkStoreSet(b *testing.B) { func BenchmarkStoreGet(b *testing.B) { s := CreateStore(100) - keys := GenKeys(100, 5) + keys := GenKeys(10000, 5) for i, key := range keys { s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) diff --git a/store/test.go b/store/test.go index a725e87db..ac23261be 100644 --- a/store/test.go +++ b/store/test.go @@ -11,7 +11,7 @@ func GenKeys(num int, depth int) []string { for i := 0; i < num; i++ { keys[i] = "/foo/" - depth := rand.Intn(depth) + depth := rand.Intn(depth) + 1 for j := 0; j < depth; j++ { keys[i] += "/" + strconv.Itoa(rand.Int()) diff --git a/store/tree_store_test.go b/store/tree_store_test.go index 385989e37..99281f3ed 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -1,7 +1,7 @@ package store import ( - "fmt" + //"fmt" "math/rand" "strconv" "testing" @@ -64,20 +64,20 @@ func TestStoreGet(t *testing.T) { ts.set("/hello/fooo", NewTestNode("barbarbar")) ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) - nodes, keys, ok := ts.list("/hello") + //nodes, keys, ok := ts.list("/hello") - if !ok { - t.Fatalf("cannot list!") - } else { - nodes, _ := nodes.([]*Node) - length := len(nodes) + // if !ok { + // t.Fatalf("cannot list!") + // } else { + // nodes, _ := nodes.([]*Node) + // length := len(nodes) - for i := 0; i < length; i++ { - fmt.Println(keys[i], "=", nodes[i].Value) - } - } + // for i := 0; i < length; i++ { + // fmt.Println(keys[i], "=", nodes[i].Value) + // } + // } - keys = GenKeys(100, 10) + keys := GenKeys(100, 10) for i := 0; i < 100; i++ { value := strconv.Itoa(rand.Int()) @@ -206,15 +206,6 @@ func BenchmarkTreeStoreCopy(b *testing.B) { } } -func BenchmarkMakeSlice(b *testing.B) { - - for i := 0; i < b.N; i++ { - for i := 0; i < 100000; i++ { - _ = make([]Node, 1) - } - } -} - func BenchmarkTreeStoreList(b *testing.B) { keys := GenKeys(10000, 10) From d0e9449ba244213f09386acf65a02cab5357b05e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 6 Aug 2013 09:08:22 -0700 Subject: [PATCH 9/9] use url package to parse url; fix commented codes --- client_handlers.go | 14 ++++++++++++++ etcd.go | 25 ++++++++++++++++--------- etcd_long_test.go | 20 ++++++++++++++++++++ store/store.go | 6 ++++++ store/tree_store_test.go | 24 ++++++++++++------------ test.go | 22 ++++++++++++++++++++++ 6 files changed, 90 insertions(+), 21 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 7ee302e46..cd659b0d5 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -315,6 +315,20 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } +// TestHandler +func TestHttpHandler(w http.ResponseWriter, req *http.Request) { + testType := req.URL.Path[len("/test/"):] + + if testType == "speed" { + directSet() + w.WriteHeader(http.StatusOK) + w.Write([]byte("speed test success")) + return + } + + w.WriteHeader(http.StatusBadRequest) +} + // Convert string duration to time format func durationToExpireTime(strDuration string) (time.Time, error) { if strDuration != "" { diff --git a/etcd.go b/etcd.go index 12901f91e..7a56b04cf 100644 --- a/etcd.go +++ b/etcd.go @@ -14,8 +14,10 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "os" "os/signal" + "path" "runtime/pprof" "strings" "time" @@ -267,9 +269,6 @@ func startRaft(securityType int) { raftServer.Start() - // start to response to raft requests - go startRaftTransport(info.RaftPort, securityType) - if raftServer.IsLogEmpty() { // start as a leader in a new cluster @@ -339,6 +338,9 @@ func startRaft(securityType int) { go raftServer.Snapshot() } + // start to response to raft requests + go startRaftTransport(info.RaftPort, securityType) + } // Create transporter using by raft server @@ -436,6 +438,7 @@ func startClientTransport(port int, st int) { http.HandleFunc("/machines", MachinesHttpHandler) http.HandleFunc("/", VersionHttpHandler) http.HandleFunc("/stats", StatsHttpHandler) + http.HandleFunc("/test/", TestHttpHandler) switch st { @@ -624,15 +627,19 @@ func joinCluster(s *raft.Server, serverName string) error { return nil } if resp.StatusCode == http.StatusTemporaryRedirect { + address := resp.Header.Get("Location") debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(command) - segs := strings.Split(address, "://") - if len(segs) != 2 { - return fmt.Errorf("Unable to join: wrong redirection info") + u, err := url.Parse(address) + + if err != nil { + return fmt.Errorf("Unable to join: %s", err.Error()) } - path := segs[1] - resp, err = t.Post(path, &b) + + json.NewEncoder(&b).Encode(command) + + resp, err = t.Post(path.Join(u.Host, u.Path), &b) + } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") return fmt.Errorf(errors[103]) diff --git a/etcd_long_test.go b/etcd_long_test.go index 566a42bd5..db603b540 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -3,6 +3,7 @@ package main import ( "fmt" "math/rand" + "net/http" "os" "strconv" "strings" @@ -119,3 +120,22 @@ func TestKillRandom(t *testing.T) { <-leaderChan } + +func BenchmarkEtcdDirectCall(b *testing.B) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + _, etcds, _ := createCluster(clusterSize, procAttr) + + defer destroyCluster(etcds) + + time.Sleep(time.Second) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, _ := http.Get("http://0.0.0.0:4001/test/speed") + resp.Body.Close() + } + +} diff --git a/store/store.go b/store/store.go index 00ea1b926..5ef4bc2d4 100644 --- a/store/store.go +++ b/store/store.go @@ -21,6 +21,12 @@ type Store struct { // key-value store structure Tree *tree + // This mutex protects everything except add watcher member. + // Add watch member does not depend on the current state of the store. + // And watch will return when other protected function is called and reach + // the watching condition. + // It is needed so that clone() can atomically replicate the Store + // and do the log snapshot in a go routine. mutex sync.Mutex // WatcherHub is where we register all the clients diff --git a/store/tree_store_test.go b/store/tree_store_test.go index 99281f3ed..ad8222ffb 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -1,7 +1,7 @@ package store import ( - //"fmt" + "fmt" "math/rand" "strconv" "testing" @@ -64,20 +64,20 @@ func TestStoreGet(t *testing.T) { ts.set("/hello/fooo", NewTestNode("barbarbar")) ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) - //nodes, keys, ok := ts.list("/hello") + nodes, keys, ok := ts.list("/hello") - // if !ok { - // t.Fatalf("cannot list!") - // } else { - // nodes, _ := nodes.([]*Node) - // length := len(nodes) + if !ok { + t.Fatalf("cannot list!") + } else { + nodes, _ := nodes.([]*Node) + length := len(nodes) - // for i := 0; i < length; i++ { - // fmt.Println(keys[i], "=", nodes[i].Value) - // } - // } + for i := 0; i < length; i++ { + fmt.Println(keys[i], "=", nodes[i].Value) + } + } - keys := GenKeys(100, 10) + keys = GenKeys(100, 10) for i := 0; i < 100; i++ { value := strconv.Itoa(rand.Int()) diff --git a/test.go b/test.go index b95abe5e1..02ce50c5a 100644 --- a/test.go +++ b/test.go @@ -166,6 +166,28 @@ func getLeader(addr string) (string, error) { } +func directSet() { + c := make(chan bool, 1000) + for i := 0; i < 1000; i++ { + go send(c) + } + + for i := 0; i < 1000; i++ { + <-c + } +} + +func send(c chan bool) { + for i := 0; i < 10; i++ { + command := &SetCommand{} + command.Key = "foo" + command.Value = "bar" + command.ExpireTime = time.Unix(0, 0) + raftServer.Do(command) + } + c <- true +} + // Dial with timeout func dialTimeoutFast(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Millisecond*10)