From 1caf2a336432ed4d8947cb8b3abb99383c691780 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 15 Sep 2013 22:28:42 -0400 Subject: [PATCH] remove old store --- command.go | 21 +- etcd.go | 3 - etcd_handlers.go | 45 ++- file_system/file_system.go | 12 +- machines.go | 8 +- name_url_map.go | 10 +- raft_server.go | 3 +- snapshot.go | 10 +- store/keyword_test.go | 37 --- store/keywords.go | 33 -- store/stats.go | 33 -- store/store.go | 663 ------------------------------------- store/store_test.go | 258 --------------- store/test.go | 21 -- store/tree.go | 318 ------------------ store/tree_store_test.go | 247 -------------- store/watcher.go | 129 -------- store/watcher_test.go | 84 ----- util.go | 9 +- 19 files changed, 54 insertions(+), 1890 deletions(-) delete mode 100644 store/keyword_test.go delete mode 100644 store/keywords.go delete mode 100644 store/stats.go delete mode 100644 store/store.go delete mode 100644 store/store_test.go delete mode 100644 store/test.go delete mode 100644 store/tree.go delete mode 100644 store/tree_store_test.go delete mode 100644 store/watcher.go delete mode 100644 store/watcher_test.go diff --git a/command.go b/command.go index 3c901893a..1c014e4d0 100644 --- a/command.go +++ b/command.go @@ -73,23 +73,6 @@ func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { return json.Marshal(e) } -// Set command -type SetCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` -} - -// The name of the set command in the log -func (c *SetCommand) CommandName() string { - return commandName("set") -} - -// Set the key-value pair -func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { - return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex()) -} - // TestAndSet command type TestAndSetCommand struct { Key string `json:"key"` @@ -240,7 +223,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdFs.Create(key, value, fileSystem.Permanent, raftServer.CommitIndex(), raftServer.Term()) - if c.Name != r.Name() { + if c.Name != r.Name() { // do not add self to the peer list r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} } @@ -267,7 +250,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { // remove machine in etcd storage key := path.Join("_etcd/machines", c.Name) - _, err := etcdStore.Delete(key, raftServer.CommitIndex()) + _, err := etcdFs.Delete(key, false, raftServer.CommitIndex(), raftServer.Term()) delete(r.peersStats, c.Name) if err != nil { diff --git a/etcd.go b/etcd.go index bd2144183..cba7809f4 100644 --- a/etcd.go +++ b/etcd.go @@ -11,7 +11,6 @@ import ( "time" "github.com/coreos/etcd/file_system" - "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) @@ -137,7 +136,6 @@ type TLSConfig struct { // //------------------------------------------------------------------------------ -var etcdStore *store.Store var etcdFs *fileSystem.FileSystem //------------------------------------------------------------------------------ @@ -206,7 +204,6 @@ func main() { info := getInfo(dirPath) // Create etcd key-value store - etcdStore = store.CreateStore(maxSize) etcdFs = fileSystem.New() snapConf = newSnapshotConf() diff --git a/etcd_handlers.go b/etcd_handlers.go index a8d0508c1..cde882c2e 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -7,6 +7,7 @@ import ( "strings" etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/file_system" "github.com/coreos/go-raft" ) @@ -83,17 +84,13 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error { //-------------------------------------- func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v2/keys"):] + key := getNodePath(req.URL.Path) debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - req.ParseForm() + value := req.FormValue("value") - value := req.Form.Get("value") - - ttl := req.FormValue("ttl") - - expireTime, err := durationToExpireTime(ttl) + expireTime, err := durationToExpireTime(req.FormValue("ttl")) if err != nil { return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create") @@ -110,22 +107,20 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { } func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v2/keys"):] + key := getNodePath(req.URL.Path) debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) value := req.FormValue("value") - ttl := req.FormValue("ttl") - - expireTime, err := durationToExpireTime(ttl) + expireTime, err := durationToExpireTime(req.FormValue("ttl")) if err != nil { return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update") } // TODO: update should give at least one option - if value == "" && ttl == "" { + if value == "" && expireTime.Sub(fileSystem.Permanent) == 0 { return nil } @@ -168,7 +163,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { // Delete Handler func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := req.URL.Path[len("/v2/keys"):] + key := getNodePath(req.URL.Path) debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) @@ -228,7 +223,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return the basic stats of etcd func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) - w.Write(etcdStore.Stats()) + //w.Write(etcdStore.Stats()) w.Write(r.Stats()) return nil } @@ -236,10 +231,18 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { var err error var event interface{} - key := req.URL.Path[len("/v1/keys"):] - debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + if req.FormValue("consistent") == "true" && r.State() != raft.Leader { + // help client to redirect the request to the current leader + leader := r.Leader() + url, _ := nameToEtcdURL(leader) + redirect(url, w, req) + return nil + } + + key := getNodePath(req.URL.Path) + recursive := req.FormValue("recursive") if req.FormValue("wait") == "true" { // watch @@ -267,15 +270,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { } else { //get - if req.FormValue("consistent") == "true" { - if r.State() != raft.Leader { - leader := r.Leader() - url, _ := nameToEtcdURL(leader) - redirect(url, w, req) - return nil - } - } - command := &GetCommand{ Key: key, } @@ -295,6 +289,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { if err != nil { return err + } else { event, _ := event.([]byte) w.WriteHeader(http.StatusOK) diff --git a/file_system/file_system.go b/file_system/file_system.go index ee67f9655..1de7830fb 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -332,30 +332,30 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { // Save function will not be able to save the state of watchers. // Save function will not save the parent field of the node. Or there will // be cyclic dependencies issue for the json package. -func (fs *FileSystem) Save() []byte { +func (fs *FileSystem) Save() ([]byte, error) { cloneFs := New() cloneFs.Root = fs.Root.Clone() b, err := json.Marshal(fs) if err != nil { - panic(err) + return nil, err } - return b + return b, nil } // recovery function recovery the store system from a static state. // It needs to recovery the parent field of the nodes. // It needs to delete the expired nodes since the saved time and also // need to create monitor go routines. -func (fs *FileSystem) Recover(state []byte) { +func (fs *FileSystem) Recovery(state []byte) error { err := json.Unmarshal(state, fs) if err != nil { - panic(err) + return err } fs.Root.recoverAndclean() - + return nil } diff --git a/machines.go b/machines.go index fbaa48d6f..1988353d5 100644 --- a/machines.go +++ b/machines.go @@ -2,9 +2,13 @@ package main // machineNum returns the number of machines in the cluster func machineNum() int { - response, _ := etcdStore.RawGet("_etcd/machines") + e, err := etcdFs.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term()) - return len(response) + if err != nil { + return 0 + } + + return len(e.KVPairs) } // getMachines gets the current machines in the cluster diff --git a/name_url_map.go b/name_url_map.go index 0e5abb1b7..38e1ecc15 100644 --- a/name_url_map.go +++ b/name_url_map.go @@ -49,16 +49,20 @@ func addNameToURL(name string, version string, raftURL string, etcdURL string) { } func readURL(nodeName string, urlName string) (string, bool) { - // if fails, try to recover from etcd storage + if nodeName == "" { + return "", false + } + + // convert nodeName to url from etcd storage key := path.Join("/_etcd/machines", nodeName) - resps, err := etcdStore.RawGet(key) + e, err := etcdFs.Get(key, false, false, r.CommitIndex(), r.Term()) if err != nil { return "", false } - m, err := url.ParseQuery(resps[0].Value) + m, err := url.ParseQuery(e.Value) if err != nil { panic("Failed to parse machines entry") diff --git a/raft_server.go b/raft_server.go index 628136479..d88f266f8 100644 --- a/raft_server.go +++ b/raft_server.go @@ -36,7 +36,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server - server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) + server, err := raft.NewServer(name, dirPath, raftTransporter, etcdFs, nil) check(err) @@ -312,7 +312,6 @@ func (r *raftServer) Stats() []byte { func registerCommands() { raft.RegisterCommand(&JoinCommand{}) raft.RegisterCommand(&RemoveCommand{}) - raft.RegisterCommand(&SetCommand{}) raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) raft.RegisterCommand(&WatchCommand{}) diff --git a/snapshot.go b/snapshot.go index 7b9da8034..a6caefd32 100644 --- a/snapshot.go +++ b/snapshot.go @@ -20,17 +20,17 @@ var snapConf *snapshotConf func newSnapshotConf() *snapshotConf { // check snapshot every 3 seconds and the threshold is 20K - return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000} + return &snapshotConf{time.Second * 3, 0, 20 * 1000} } func monitorSnapshot() { for { time.Sleep(snapConf.checkingInterval) - currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites - - if currentWrites > snapConf.writesThr { + //currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites + currentWrites := 0 + if uint64(currentWrites) > snapConf.writesThr { r.TakeSnapshot() - snapConf.lastWrites = etcdStore.TotalWrites() + snapConf.lastWrites = 0 } } } diff --git a/store/keyword_test.go b/store/keyword_test.go deleted file mode 100644 index 7c54a9fde..000000000 --- a/store/keyword_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package store - -import ( - "testing" -) - -func TestKeywords(t *testing.T) { - keyword := CheckKeyword("_etcd") - if !keyword { - t.Fatal("_etcd should be keyword") - } - - keyword = CheckKeyword("/_etcd") - - if !keyword { - t.Fatal("/_etcd should be keyword") - } - - keyword = CheckKeyword("/_etcd/") - - if !keyword { - t.Fatal("/_etcd/ contains keyword prefix") - } - - keyword = CheckKeyword("/_etcd/node1") - - if !keyword { - t.Fatal("/_etcd/* contains keyword prefix") - } - - keyword = CheckKeyword("/nokeyword/_etcd/node1") - - if keyword { - t.Fatal("this does not contain keyword prefix") - } - -} diff --git a/store/keywords.go b/store/keywords.go deleted file mode 100644 index 2e4ceb75b..000000000 --- a/store/keywords.go +++ /dev/null @@ -1,33 +0,0 @@ -package store - -import ( - "path" - "strings" -) - -// keywords for internal useage -// Key for string keyword; Value for only checking prefix -var keywords = map[string]bool{ - "/_etcd": true, - "/ephemeralNodes": true, -} - -// CheckKeyword will check if the key contains the keyword. -// For now, we only check for prefix. -func CheckKeyword(key string) bool { - key = path.Clean("/" + key) - - // find the second "/" - i := strings.Index(key[1:], "/") - - var prefix string - - if i == -1 { - prefix = key - } else { - prefix = key[:i+1] - } - _, ok := keywords[prefix] - - return ok -} diff --git a/store/stats.go b/store/stats.go deleted file mode 100644 index b57f4db3d..000000000 --- a/store/stats.go +++ /dev/null @@ -1,33 +0,0 @@ -package store - -import ( - "encoding/json" -) - -type EtcdStats struct { - // Number of get requests - Gets uint64 `json:"gets"` - - // Number of sets requests - Sets uint64 `json:"sets"` - - // Number of delete requests - Deletes uint64 `json:"deletes"` - - // Number of testAndSet requests - TestAndSets uint64 `json:"testAndSets"` -} - -// Stats returns the basic statistics information of etcd storage since its recent start -func (s *Store) Stats() []byte { - b, _ := json.Marshal(s.BasicStats) - return b -} - -// TotalWrites returns the total write operations -// It helps with snapshot -func (s *Store) TotalWrites() uint64 { - bs := s.BasicStats - - return bs.Deletes + bs.Sets + bs.TestAndSets -} diff --git a/store/store.go b/store/store.go deleted file mode 100644 index 916c1394e..000000000 --- a/store/store.go +++ /dev/null @@ -1,663 +0,0 @@ -package store - -import ( - "encoding/json" - "fmt" - "path" - "strconv" - "sync" - "time" - - etcdErr "github.com/coreos/etcd/error" -) - -//------------------------------------------------------------------------------ -// -// Typedefs -// -//------------------------------------------------------------------------------ - -// The main struct of the Key-Value store -type Store struct { - - // key-value store structure - Tree *tree - - // This mutex protects everything except add watcher member. - // Add watch member does not depend on the current state of the store. - // And watch will return when other protected function is called and reach - // the watching condition. - // It is needed so that clone() can atomically replicate the Store - // and do the log snapshot in a go routine. - mutex sync.RWMutex - - // WatcherHub is where we register all the clients - // who issue a watch request - watcher *WatcherHub - - // The string channel to send messages to the outside world - // Now we use it to send changes to the hub of the web service - messager chan<- string - - // A map to keep the recent response to the clients - ResponseMap map[string]*Response - - // The max number of the recent responses we can record - ResponseMaxSize int - - // The current number of the recent responses we have recorded - ResponseCurrSize uint - - // The index of the first recent responses we have - ResponseStartIndex uint64 - - // Current index of the raft machine - Index uint64 - - // Basic statistics information of etcd storage - BasicStats EtcdStats -} - -// A Node represents a Value in the Key-Value pair in the store -// It has its value, expire time and a channel used to update the -// expire time (since we do countdown in a go routine, we need to -// communicate with it via channel) -type Node struct { - // The string value of the node - Value string `json:"value"` - - // If the node is a permanent one the ExprieTime will be Unix(0,0) - // Otherwise after the expireTime, the node will be deleted - ExpireTime time.Time `json:"expireTime"` - - // A channel to update the expireTime of the node - update chan time.Time `json:"-"` -} - -// The response from the store to the user who issue a command -type Response struct { - Action string `json:"action"` - Key string `json:"key"` - Dir bool `json:"dir,omitempty"` - PrevValue string `json:"prevValue,omitempty"` - Value string `json:"value,omitempty"` - - // If the key did not exist before the action, - // this field should be set to true - NewKey bool `json:"newKey,omitempty"` - - Expiration *time.Time `json:"expiration,omitempty"` - - // Time to live in second - TTL int64 `json:"ttl,omitempty"` - - // The command index of the raft machine when the command is executed - Index uint64 `json:"index"` -} - -// A listNode represent the simplest Key-Value pair with its type -// It is only used when do list opeartion -// We want to have a file system like store, thus we distingush "file" -// and "directory" -type ListNode struct { - Key string - Value string - Type string -} - -var PERMANENT = time.Unix(0, 0) - -//------------------------------------------------------------------------------ -// -// Methods -// -//------------------------------------------------------------------------------ - -// Create a new stroe -// Arguement max is the max number of response we want to record -func CreateStore(max int) *Store { - s := new(Store) - - s.messager = nil - - s.ResponseMap = make(map[string]*Response) - s.ResponseStartIndex = 0 - s.ResponseMaxSize = max - s.ResponseCurrSize = 0 - - s.Tree = &tree{ - &treeNode{ - Node{ - "/", - time.Unix(0, 0), - nil, - }, - true, - make(map[string]*treeNode), - }, - } - - s.watcher = newWatcherHub() - - return s -} - -// Set the messager of the store -func (s *Store) SetMessager(messager chan<- string) { - s.messager = messager -} - -func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.internalSet(key, value, expireTime, index) - -} - -// Set the key to value with expiration time -func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { - //Update index - s.Index = index - - //Update stats - s.BasicStats.Sets++ - - key = path.Clean("/" + key) - - isExpire := !expireTime.Equal(PERMANENT) - - // base response - resp := Response{ - Action: "SET", - Key: key, - Value: value, - Index: index, - } - - // When the slow follower receive the set command - // the key may be expired, we should not add the node - // also if the node exist, we need to delete the node - if isExpire && expireTime.Sub(time.Now()) < 0 { - return s.internalDelete(key, index) - } - - var TTL int64 - - // Update ttl - if isExpire { - TTL = int64(expireTime.Sub(time.Now()) / time.Second) - resp.Expiration = &expireTime - resp.TTL = TTL - } - - // Get the node - node, ok := s.Tree.get(key) - - if ok { - // Update when node exists - - // Node is not permanent - if !node.ExpireTime.Equal(PERMANENT) { - - // If node is not permanent - // Update its expireTime - node.update <- expireTime - - } else { - - // If we want the permanent node to have expire time - // We need to create a go routine with a channel - if isExpire { - node.update = make(chan time.Time) - go s.monitorExpiration(key, node.update, expireTime) - } - - } - - // Update the information of the node - s.Tree.set(key, Node{value, expireTime, node.update}) - - resp.PrevValue = node.Value - - s.watcher.notify(resp) - - msg, err := json.Marshal(resp) - - // Send to the messager - if s.messager != nil && err == nil { - s.messager <- string(msg) - } - - s.addToResponseMap(index, &resp) - - return msg, err - - // Add new node - } else { - - update := make(chan time.Time) - - ok := s.Tree.set(key, Node{value, expireTime, update}) - - if !ok { - return nil, etcdErr.NewError(102, "set: "+key) - } - - if isExpire { - go s.monitorExpiration(key, update, expireTime) - } - - resp.NewKey = true - - msg, err := json.Marshal(resp) - - // Nofity the watcher - s.watcher.notify(resp) - - // Send to the messager - if s.messager != nil && err == nil { - s.messager <- string(msg) - } - - s.addToResponseMap(index, &resp) - return msg, err - } - -} - -// Get the value of the key and return the raw response -func (s *Store) internalGet(key string) *Response { - - key = path.Clean("/" + key) - - node, ok := s.Tree.get(key) - - if ok { - var TTL int64 - var isExpire bool = false - - isExpire = !node.ExpireTime.Equal(PERMANENT) - - resp := &Response{ - Action: "GET", - Key: key, - Value: node.Value, - Index: s.Index, - } - - // Update ttl - if isExpire { - TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) - resp.Expiration = &node.ExpireTime - resp.TTL = TTL - } - - return resp - - } else { - // we do not found the key - return nil - } -} - -// Get all the items under key -// If key is a file return the file -// If key is a directory reuturn an array of files -func (s *Store) Get(key string) ([]byte, error) { - s.mutex.RLock() - defer s.mutex.RUnlock() - - resps, err := s.RawGet(key) - - if err != nil { - return nil, err - } - - key = path.Clean("/" + key) - - // If the number of resps == 1 and the response key - // is the key we query, a signal key-value should - // be returned - if len(resps) == 1 && resps[0].Key == key { - return json.Marshal(resps[0]) - } - - return json.Marshal(resps) -} - -func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) { - resps := make([]*Response, 1) - - isExpire := !node.ExpireTime.Equal(PERMANENT) - - resps[0] = &Response{ - Action: "GET", - Index: s.Index, - Key: key, - Value: node.Value, - } - - // Update ttl - if isExpire { - TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second) - resps[0].Expiration = &node.ExpireTime - resps[0].TTL = TTL - } - - return resps, nil -} - -func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) { - resps := make([]*Response, len(nodes)) - - // TODO: check if nodes and keys are the same length - for i := 0; i < len(nodes); i++ { - var TTL int64 - var isExpire bool = false - - isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) - - resps[i] = &Response{ - Action: "GET", - Index: s.Index, - Key: path.Join(key, keys[i]), - } - - if len(nodes[i].Value) != 0 { - resps[i].Value = nodes[i].Value - } else { - resps[i].Dir = true - } - - // Update ttl - if isExpire { - TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second) - resps[i].Expiration = &nodes[i].ExpireTime - resps[i].TTL = TTL - } - - } - - return resps, nil -} - -func (s *Store) RawGet(key string) ([]*Response, error) { - // Update stats - s.BasicStats.Gets++ - - key = path.Clean("/" + key) - - nodes, keys, ok := s.Tree.list(key) - if !ok { - return nil, etcdErr.NewError(100, "get: "+key) - } - - switch node := nodes.(type) { - case *Node: - return s.rawGetNode(key, node) - case []*Node: - return s.rawGetNodeList(key, keys, node) - default: - panic("invalid cast ") - } -} - -func (s *Store) Delete(key string, index uint64) ([]byte, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.internalDelete(key, index) -} - -// Delete the key -func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { - - // Update stats - s.BasicStats.Deletes++ - - key = path.Clean("/" + key) - - // Update index - s.Index = index - - node, ok := s.Tree.get(key) - - if !ok { - return nil, etcdErr.NewError(100, "delete: "+key) - } - - resp := Response{ - Action: "DELETE", - Key: key, - PrevValue: node.Value, - Index: index, - } - - if node.ExpireTime.Equal(PERMANENT) { - - s.Tree.delete(key) - - } else { - resp.Expiration = &node.ExpireTime - // Kill the expire go routine - node.update <- PERMANENT - s.Tree.delete(key) - - } - - msg, err := json.Marshal(resp) - - s.watcher.notify(resp) - - // notify the messager - if s.messager != nil && err == nil { - s.messager <- string(msg) - } - - s.addToResponseMap(index, &resp) - - return msg, err -} - -// Set the value of the key to the value if the given prevValue is equal to the value of the key -func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - // Update stats - s.BasicStats.TestAndSets++ - - resp := s.internalGet(key) - - if resp == nil { - if prevValue != "" { - errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue) - return nil, etcdErr.NewError(100, errmsg) - } - return s.internalSet(key, value, expireTime, index) - } - - if resp.Value == prevValue { - - // If test succeed, do set - return s.internalSet(key, value, expireTime, index) - } else { - - // If fails, return err - return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s", - resp.Value, prevValue)) - } - -} - -// Add a channel to the watchHub. -// The watchHub will send response to the channel when any key under the prefix -// changes [since the sinceIndex if given] -func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error { - return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap) -} - -// This function should be created as a go routine to delete the key-value pair -// when it reaches expiration time - -func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) { - - duration := expireTime.Sub(time.Now()) - - for { - select { - - // Timeout delete the node - case <-time.After(duration): - node, ok := s.Tree.get(key) - - if !ok { - return - - } else { - s.mutex.Lock() - - s.Tree.delete(key) - - resp := Response{ - Action: "DELETE", - Key: key, - PrevValue: node.Value, - Expiration: &node.ExpireTime, - Index: s.Index, - } - s.mutex.Unlock() - - msg, err := json.Marshal(resp) - - s.watcher.notify(resp) - - // notify the messager - if s.messager != nil && err == nil { - s.messager <- string(msg) - } - - return - - } - - case updateTime := <-update: - // Update duration - // If the node become a permanent one, the go routine is - // not needed - if updateTime.Equal(PERMANENT) { - return - } - - // Update duration - duration = updateTime.Sub(time.Now()) - } - } -} - -// When we receive a command that will change the state of the key-value store -// We will add the result of it to the ResponseMap for the use of watch command -// Also we may remove the oldest response when we add new one -func (s *Store) addToResponseMap(index uint64, resp *Response) { - - // zero case - if s.ResponseMaxSize == 0 { - return - } - - strIndex := strconv.FormatUint(index, 10) - s.ResponseMap[strIndex] = resp - - // unlimited - if s.ResponseMaxSize < 0 { - s.ResponseCurrSize++ - return - } - - // if we reach the max point, we need to delete the most latest - // response and update the startIndex - if s.ResponseCurrSize == uint(s.ResponseMaxSize) { - s.ResponseStartIndex++ - delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) - } else { - s.ResponseCurrSize++ - } -} - -func (s *Store) clone() *Store { - newStore := &Store{ - ResponseMaxSize: s.ResponseMaxSize, - ResponseCurrSize: s.ResponseCurrSize, - ResponseStartIndex: s.ResponseStartIndex, - Index: s.Index, - BasicStats: s.BasicStats, - } - - newStore.Tree = s.Tree.clone() - newStore.ResponseMap = make(map[string]*Response) - - for index, response := range s.ResponseMap { - newStore.ResponseMap[index] = response - } - - return newStore -} - -// Save the current state of the storage system -func (s *Store) Save() ([]byte, error) { - // first we clone the store - // json is very slow, we cannot hold the lock for such a long time - s.mutex.Lock() - cloneStore := s.clone() - s.mutex.Unlock() - - b, err := json.Marshal(cloneStore) - if err != nil { - fmt.Println(err) - return nil, err - } - return b, nil -} - -// Recovery the state of the stroage system from a previous state -func (s *Store) Recovery(state []byte) error { - s.mutex.Lock() - defer s.mutex.Unlock() - // we need to stop all the current watchers - // recovery will clear watcherHub - s.watcher.stopWatchers() - - err := json.Unmarshal(state, s) - - // The only thing need to change after the recovery is the - // node with expiration time, we need to delete all the node - // that have been expired and setup go routines to monitor the - // other ones - s.checkExpiration() - - return err -} - -// Clean the expired nodes -// Set up go routines to mon -func (s *Store) checkExpiration() { - s.Tree.traverse(s.checkNode, false) -} - -// Check each node -func (s *Store) checkNode(key string, node *Node) { - - if node.ExpireTime.Equal(PERMANENT) { - return - } else { - if node.ExpireTime.Sub(time.Now()) >= time.Second { - - node.update = make(chan time.Time) - go s.monitorExpiration(key, node.update, node.ExpireTime) - - } else { - // we should delete this node - s.Tree.delete(key) - } - } -} diff --git a/store/store_test.go b/store/store_test.go deleted file mode 100644 index 6bd719008..000000000 --- a/store/store_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package store - -import ( - "encoding/json" - "testing" - "time" -) - -func TestStoreGetDelete(t *testing.T) { - - s := CreateStore(100) - s.Set("foo", "bar", time.Unix(0, 0), 1) - res, err := s.Get("foo") - - if err != nil { - t.Fatalf("Unknown error") - } - - var result Response - json.Unmarshal(res, &result) - - if result.Value != "bar" { - t.Fatalf("Cannot get stored value") - } - - s.Delete("foo", 2) - _, err = s.Get("foo") - - if err == nil { - t.Fatalf("Got deleted value") - } -} - -func TestTestAndSet(t *testing.T) { - s := CreateStore(100) - s.Set("foo", "bar", time.Unix(0, 0), 1) - - _, err := s.TestAndSet("foo", "barbar", "barbar", time.Unix(0, 0), 2) - - if err == nil { - t.Fatalf("test bar == barbar should fail") - } - - _, err = s.TestAndSet("foo", "bar", "barbar", time.Unix(0, 0), 3) - - if err != nil { - t.Fatalf("test bar == bar should succeed") - } - - _, err = s.TestAndSet("foo", "", "barbar", time.Unix(0, 0), 4) - - if err == nil { - t.Fatalf("test empty == bar should fail") - } - - _, err = s.TestAndSet("fooo", "bar", "barbar", time.Unix(0, 0), 5) - - if err == nil { - t.Fatalf("test bar == non-existing key should fail") - } - - _, err = s.TestAndSet("fooo", "", "bar", time.Unix(0, 0), 6) - - if err != nil { - t.Fatalf("test empty == non-existing key should succeed") - } - -} - -func TestSaveAndRecovery(t *testing.T) { - - s := CreateStore(100) - s.Set("foo", "bar", time.Unix(0, 0), 1) - s.Set("foo2", "bar2", time.Now().Add(time.Second*5), 2) - state, err := s.Save() - - if err != nil { - t.Fatalf("Cannot Save %s", err) - } - - newStore := CreateStore(100) - - // wait for foo2 expires - time.Sleep(time.Second * 6) - - newStore.Recovery(state) - - res, err := newStore.Get("foo") - - var result Response - json.Unmarshal(res, &result) - - if result.Value != "bar" { - t.Fatalf("Recovery Fail") - } - - res, err = newStore.Get("foo2") - - if err == nil { - t.Fatalf("Get expired value") - } - - s.Delete("foo", 3) - -} - -func TestExpire(t *testing.T) { - // test expire - s := CreateStore(100) - s.Set("foo", "bar", time.Now().Add(time.Second*1), 0) - time.Sleep(2 * time.Second) - - _, err := s.Get("foo") - - if err == nil { - t.Fatalf("Got expired value") - } - - //test change expire time - s.Set("foo", "bar", time.Now().Add(time.Second*10), 1) - - _, err = s.Get("foo") - - if err != nil { - t.Fatalf("Cannot get Value") - } - - s.Set("foo", "barbar", time.Now().Add(time.Second*1), 2) - - time.Sleep(2 * time.Second) - - _, err = s.Get("foo") - - if err == nil { - t.Fatalf("Got expired value") - } - - // test change expire to stable - s.Set("foo", "bar", time.Now().Add(time.Second*1), 3) - - s.Set("foo", "bar", time.Unix(0, 0), 4) - - time.Sleep(2 * time.Second) - - _, err = s.Get("foo") - - if err != nil { - t.Fatalf("Cannot get Value") - } - - // test stable to expire - s.Set("foo", "bar", time.Now().Add(time.Second*1), 5) - time.Sleep(2 * time.Second) - _, err = s.Get("foo") - - if err == nil { - t.Fatalf("Got expired value") - } - - // test set older node - s.Set("foo", "bar", time.Now().Add(-time.Second*1), 6) - _, err = s.Get("foo") - - if err == nil { - t.Fatalf("Got expired value") - } - -} - -func BenchmarkStoreSet(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 5) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - - for i, key := range keys { - s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) - } - - s = CreateStore(100) - } -} - -func BenchmarkStoreGet(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 5) - - for i, key := range keys { - s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - - for _, key := range keys { - s.Get(key) - } - - } -} - -func BenchmarkStoreSnapshotCopy(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 5) - - for i, key := range keys { - s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) - } - - var state []byte - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.clone() - } - b.SetBytes(int64(len(state))) -} - -func BenchmarkSnapshotSaveJson(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 5) - - for i, key := range keys { - s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) - } - - var state []byte - - b.ResetTimer() - for i := 0; i < b.N; i++ { - state, _ = s.Save() - } - b.SetBytes(int64(len(state))) -} - -func BenchmarkSnapshotRecovery(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 5) - - for i, key := range keys { - s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i)) - } - - state, _ := s.Save() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - newStore := CreateStore(100) - newStore.Recovery(state) - } - b.SetBytes(int64(len(state))) -} diff --git a/store/test.go b/store/test.go deleted file mode 100644 index eaddaa69d..000000000 --- a/store/test.go +++ /dev/null @@ -1,21 +0,0 @@ -package store - -import ( - "math/rand" - "strconv" -) - -// GenKeys randomly generate num of keys with max depth -func GenKeys(num int, depth int) []string { - keys := make([]string, num) - for i := 0; i < num; i++ { - - keys[i] = "/foo" - depth := rand.Intn(depth) + 1 - - for j := 0; j < depth; j++ { - keys[i] += "/" + strconv.Itoa(rand.Int()%20) - } - } - return keys -} diff --git a/store/tree.go b/store/tree.go deleted file mode 100644 index 3d6d1bfa4..000000000 --- a/store/tree.go +++ /dev/null @@ -1,318 +0,0 @@ -package store - -import ( - "path" - "sort" - "strings" - "time" -) - -//------------------------------------------------------------------------------ -// -// Typedefs -// -//------------------------------------------------------------------------------ - -// A file system like tree structure. Each non-leaf node of the tree has a hashmap to -// store its children nodes. Leaf nodes has no hashmap (a nil pointer) -type tree struct { - Root *treeNode -} - -// A treeNode wraps a Node. It has a hashmap to keep records of its children treeNodes. -type treeNode struct { - InternalNode Node - Dir bool - NodeMap map[string]*treeNode -} - -// TreeNode with its key. We use it when we need to sort the treeNodes. -type tnWithKey struct { - key string - tn *treeNode -} - -// Define type and functions to match sort interface -type tnWithKeySlice []tnWithKey - -func (s tnWithKeySlice) Len() int { return len(s) } -func (s tnWithKeySlice) Less(i, j int) bool { return s[i].key < s[j].key } -func (s tnWithKeySlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -// CONSTANT VARIABLE - -// Represent an empty node -var emptyNode = Node{"", PERMANENT, nil} - -//------------------------------------------------------------------------------ -// -// Methods -// -//------------------------------------------------------------------------------ - -// Set the key to the given value, return true if success -// If any intermidate path of the key is not a directory type, it will fail -// For example if the /foo = Node(bar) exists, set /foo/foo = Node(barbar) -// will fail. -func (t *tree) set(key string, value Node) bool { - - nodesName := split(key) - - // avoid set value to "/" - if len(nodesName) == 1 && len(nodesName[0]) == 0 { - return false - } - - nodeMap := t.Root.NodeMap - - i := 0 - newDir := false - - // go through all the path - for i = 0; i < len(nodesName)-1; i++ { - - // if we meet a new directory, all the directory after it must be new - if newDir { - tn := &treeNode{emptyNode, true, make(map[string]*treeNode)} - nodeMap[nodesName[i]] = tn - nodeMap = tn.NodeMap - continue - } - - // get the node from the nodeMap of the current level - tn, ok := nodeMap[nodesName[i]] - - if !ok { - // add a new directory and set newDir to true - newDir = true - tn := &treeNode{emptyNode, true, make(map[string]*treeNode)} - nodeMap[nodesName[i]] = tn - nodeMap = tn.NodeMap - - } else if ok && !tn.Dir { - - // if we meet a non-directory node, we cannot set the key - return false - } else { - - // update the nodeMap to next level - nodeMap = tn.NodeMap - } - - } - - // Add the last node - tn, ok := nodeMap[nodesName[i]] - - if !ok { - // we add a new treeNode - tn := &treeNode{value, false, nil} - nodeMap[nodesName[i]] = tn - - } else { - if tn.Dir { - return false - } - // we change the value of a old Treenode - tn.InternalNode = value - } - return true - -} - -// Get the tree node of the key -func (t *tree) internalGet(key string) (*treeNode, bool) { - nodesName := split(key) - - // should be able to get root - if len(nodesName) == 1 && nodesName[0] == "" { - return t.Root, true - } - - nodeMap := t.Root.NodeMap - - var i int - - for i = 0; i < len(nodesName)-1; i++ { - node, ok := nodeMap[nodesName[i]] - if !ok || !node.Dir { - return nil, false - } - nodeMap = node.NodeMap - } - - tn, ok := nodeMap[nodesName[i]] - if ok { - return tn, ok - } else { - return nil, ok - } -} - -// get the internalNode of the key -func (t *tree) get(key string) (Node, bool) { - tn, ok := t.internalGet(key) - - if ok { - if tn.Dir { - return emptyNode, false - } - return tn.InternalNode, ok - } else { - return emptyNode, ok - } -} - -// get the internalNode of the key -func (t *tree) list(directory string) (interface{}, []string, bool) { - treeNode, ok := t.internalGet(directory) - - if !ok { - return nil, nil, ok - - } else { - if !treeNode.Dir { - return &treeNode.InternalNode, nil, ok - } - length := len(treeNode.NodeMap) - nodes := make([]*Node, length) - keys := make([]string, length) - - i := 0 - for key, node := range treeNode.NodeMap { - nodes[i] = &node.InternalNode - keys[i] = key - i++ - } - - return nodes, keys, ok - } -} - -// delete the key, return true if success -func (t *tree) delete(key string) bool { - nodesName := split(key) - - nodeMap := t.Root.NodeMap - - var i int - - for i = 0; i < len(nodesName)-1; i++ { - node, ok := nodeMap[nodesName[i]] - if !ok || !node.Dir { - return false - } - nodeMap = node.NodeMap - } - - node, ok := nodeMap[nodesName[i]] - if ok && !node.Dir { - delete(nodeMap, nodesName[i]) - return true - } - return false -} - -// traverse wrapper -func (t *tree) traverse(f func(string, *Node), sort bool) { - if sort { - sortDfs("", t.Root, f) - } else { - dfs("", t.Root, f) - } -} - -// clone() will return a deep cloned tree -func (t *tree) clone() *tree { - newTree := new(tree) - newTree.Root = &treeNode{ - Node{ - "/", - time.Unix(0, 0), - nil, - }, - true, - make(map[string]*treeNode), - } - recursiveClone(t.Root, newTree.Root) - return newTree -} - -// recursiveClone is a helper function for clone() -func recursiveClone(tnSrc *treeNode, tnDes *treeNode) { - if !tnSrc.Dir { - tnDes.InternalNode = tnSrc.InternalNode - return - - } else { - tnDes.InternalNode = tnSrc.InternalNode - tnDes.Dir = true - tnDes.NodeMap = make(map[string]*treeNode) - - for key, tn := range tnSrc.NodeMap { - newTn := new(treeNode) - recursiveClone(tn, newTn) - tnDes.NodeMap[key] = newTn - } - - } -} - -// deep first search to traverse the tree -// apply the func f to each internal node -func dfs(key string, t *treeNode, f func(string, *Node)) { - - // base case - if len(t.NodeMap) == 0 { - f(key, &t.InternalNode) - - // recursion - } else { - for tnKey, tn := range t.NodeMap { - tnKey := key + "/" + tnKey - dfs(tnKey, tn, f) - } - } -} - -// sort deep first search to traverse the tree -// apply the func f to each internal node -func sortDfs(key string, t *treeNode, f func(string, *Node)) { - // base case - if len(t.NodeMap) == 0 { - f(key, &t.InternalNode) - - // recursion - } else { - - s := make(tnWithKeySlice, len(t.NodeMap)) - i := 0 - - // copy - for tnKey, tn := range t.NodeMap { - tnKey := key + "/" + tnKey - s[i] = tnWithKey{tnKey, tn} - i++ - } - - // sort - sort.Sort(s) - - // traverse - for i = 0; i < len(t.NodeMap); i++ { - sortDfs(s[i].key, s[i].tn, f) - } - } -} - -// split the key by '/', get the intermediate node name -func split(key string) []string { - key = "/" + key - key = path.Clean(key) - - // get the intermidate nodes name - nodesName := strings.Split(key, "/") - // we do not need the root node, since we start with it - nodesName = nodesName[1:] - return nodesName -} diff --git a/store/tree_store_test.go b/store/tree_store_test.go deleted file mode 100644 index ad8222ffb..000000000 --- a/store/tree_store_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package store - -import ( - "fmt" - "math/rand" - "strconv" - "testing" - "time" -) - -func TestStoreGet(t *testing.T) { - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - // create key - ts.set("/foo", NewTestNode("bar")) - // change value - ts.set("/foo", NewTestNode("barbar")) - // create key - ts.set("/hello/foo", NewTestNode("barbarbar")) - treeNode, ok := ts.get("/foo") - - if !ok { - t.Fatalf("Expect to get node, but not") - } - if treeNode.Value != "barbar" { - t.Fatalf("Expect value barbar, but got %s", treeNode.Value) - } - - // create key - treeNode, ok = ts.get("/hello/foo") - if !ok { - t.Fatalf("Expect to get node, but not") - } - if treeNode.Value != "barbarbar" { - t.Fatalf("Expect value barbarbar, but got %s", treeNode.Value) - } - - // create a key under other key - ok = ts.set("/foo/foo", NewTestNode("bar")) - if ok { - t.Fatalf("shoud not add key under a exisiting key") - } - - // delete a key - ok = ts.delete("/foo") - if !ok { - t.Fatalf("cannot delete key") - } - - // delete a directory - ok = ts.delete("/hello") - if ok { - t.Fatalf("Expect cannot delet /hello, but deleted! ") - } - - // test list - ts.set("/hello/fooo", NewTestNode("barbarbar")) - ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) - - nodes, keys, ok := ts.list("/hello") - - if !ok { - t.Fatalf("cannot list!") - } else { - nodes, _ := nodes.([]*Node) - length := len(nodes) - - for i := 0; i < length; i++ { - fmt.Println(keys[i], "=", nodes[i].Value) - } - } - - keys = GenKeys(100, 10) - - for i := 0; i < 100; i++ { - value := strconv.Itoa(rand.Int()) - ts.set(keys[i], NewTestNode(value)) - treeNode, ok := ts.get(keys[i]) - - if !ok { - continue - } - if treeNode.Value != value { - t.Fatalf("Expect value %s, but got %s", value, treeNode.Value) - } - - } - ts.traverse(f, true) -} - -func TestTreeClone(t *testing.T) { - keys := GenKeys(10000, 10) - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - backTs := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - // generate the first tree - for _, key := range keys { - value := strconv.Itoa(rand.Int()) - ts.set(key, NewTestNode(value)) - backTs.set(key, NewTestNode(value)) - } - - copyTs := ts.clone() - - // test if they are identical - copyTs.traverse(ts.contain, false) - - // remove all the keys from first tree - for _, key := range keys { - ts.delete(key) - } - - // test if they are identical - // make sure changes in the first tree will affect the copy one - copyTs.traverse(backTs.contain, false) - -} - -func BenchmarkTreeStoreSet(b *testing.B) { - - keys := GenKeys(10000, 10) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - for _, key := range keys { - value := strconv.Itoa(rand.Int()) - ts.set(key, NewTestNode(value)) - } - } -} - -func BenchmarkTreeStoreGet(b *testing.B) { - - keys := GenKeys(10000, 10) - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - for _, key := range keys { - value := strconv.Itoa(rand.Int()) - ts.set(key, NewTestNode(value)) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - for _, key := range keys { - ts.get(key) - } - } -} - -func BenchmarkTreeStoreCopy(b *testing.B) { - keys := GenKeys(10000, 10) - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - for _, key := range keys { - value := strconv.Itoa(rand.Int()) - ts.set(key, NewTestNode(value)) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ts.clone() - } -} - -func BenchmarkTreeStoreList(b *testing.B) { - - keys := GenKeys(10000, 10) - - ts := &tree{ - &treeNode{ - NewTestNode("/"), - true, - make(map[string]*treeNode), - }, - } - - for _, key := range keys { - value := strconv.Itoa(rand.Int()) - ts.set(key, NewTestNode(value)) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - for _, key := range keys { - ts.list(key) - } - } -} - -func (t *tree) contain(key string, node *Node) { - _, ok := t.get(key) - if !ok { - panic("tree do not contain the given key") - } -} - -func f(key string, n *Node) { - return -} - -func NewTestNode(value string) Node { - return Node{value, time.Unix(0, 0), nil} -} diff --git a/store/watcher.go b/store/watcher.go deleted file mode 100644 index 17de27b21..000000000 --- a/store/watcher.go +++ /dev/null @@ -1,129 +0,0 @@ -package store - -import ( - "path" - "strconv" - "strings" -) - -//------------------------------------------------------------------------------ -// -// Typedefs -// -//------------------------------------------------------------------------------ - -// WatcherHub is where the client register its watcher -type WatcherHub struct { - watchers map[string][]*Watcher -} - -// Currently watcher only contains a response channel -type Watcher struct { - C chan *Response -} - -// Create a new watcherHub -func newWatcherHub() *WatcherHub { - w := new(WatcherHub) - w.watchers = make(map[string][]*Watcher) - return w -} - -// Create a new watcher -func NewWatcher() *Watcher { - return &Watcher{C: make(chan *Response, 1)} -} - -// Add a watcher to the watcherHub -func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64, - responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error { - - prefix = path.Clean("/" + prefix) - - if sinceIndex != 0 && sinceIndex >= responseStartIndex { - for i := sinceIndex; i <= currentIndex; i++ { - if checkResponse(prefix, i, resMap) { - watcher.C <- resMap[strconv.FormatUint(i, 10)] - return nil - } - } - } - - _, ok := w.watchers[prefix] - - if !ok { - w.watchers[prefix] = make([]*Watcher, 0) - } - - w.watchers[prefix] = append(w.watchers[prefix], watcher) - - return nil -} - -// Check if the response has what we are watching -func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool { - - resp, ok := resMap[strconv.FormatUint(index, 10)] - - if !ok { - // not storage system command - return false - } else { - path := resp.Key - if strings.HasPrefix(path, prefix) { - prefixLen := len(prefix) - if len(path) == prefixLen || path[prefixLen] == '/' { - return true - } - - } - } - - return false -} - -// Notify the watcher a action happened -func (w *WatcherHub) notify(resp Response) error { - resp.Key = path.Clean(resp.Key) - - segments := strings.Split(resp.Key, "/") - currPath := "/" - - // walk through all the pathes - for _, segment := range segments { - currPath = path.Join(currPath, segment) - - watchers, ok := w.watchers[currPath] - - if ok { - - newWatchers := make([]*Watcher, 0) - // notify all the watchers - for _, watcher := range watchers { - watcher.C <- &resp - } - - if len(newWatchers) == 0 { - // we have notified all the watchers at this path - // delete the map - delete(w.watchers, currPath) - } else { - w.watchers[currPath] = newWatchers - } - } - - } - - return nil -} - -// stopWatchers stops all the watchers -// This function is used when the etcd recovery from a snapshot at runtime -func (w *WatcherHub) stopWatchers() { - for _, subWatchers := range w.watchers { - for _, watcher := range subWatchers { - watcher.C <- nil - } - } - w.watchers = nil -} diff --git a/store/watcher_test.go b/store/watcher_test.go deleted file mode 100644 index b5730ed93..000000000 --- a/store/watcher_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package store - -import ( - "testing" - "time" -) - -func TestWatch(t *testing.T) { - - s := CreateStore(100) - - watchers := make([]*Watcher, 10) - - for i, _ := range watchers { - - // create a new watcher - watchers[i] = NewWatcher() - // add to the watchers list - s.AddWatcher("foo", watchers[i], 0) - - } - - s.Set("/foo/foo", "bar", time.Unix(0, 0), 1) - - for _, watcher := range watchers { - - // wait for the notification for any changing - res := <-watcher.C - - if res == nil { - t.Fatal("watcher is cleared") - } - } - - for i, _ := range watchers { - - // create a new watcher - watchers[i] = NewWatcher() - // add to the watchers list - s.AddWatcher("foo/foo/foo", watchers[i], 0) - - } - - s.watcher.stopWatchers() - - for _, watcher := range watchers { - - // wait for the notification for any changing - res := <-watcher.C - - if res != nil { - t.Fatal("watcher is cleared") - } - } -} - -// BenchmarkWatch creates 10K watchers watch at /foo/[path] each time. -// Path is randomly chosen with max depth 10. -// It should take less than 15ms to wake up 10K watchers. -func BenchmarkWatch(b *testing.B) { - s := CreateStore(100) - - keys := GenKeys(10000, 10) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - watchers := make([]*Watcher, 10000) - for i := 0; i < 10000; i++ { - // create a new watcher - watchers[i] = NewWatcher() - // add to the watchers list - s.AddWatcher(keys[i], watchers[i], 0) - } - - s.watcher.stopWatchers() - - for _, watcher := range watchers { - // wait for the notification for any changing - <-watcher.C - } - - s.watcher = newWatcherHub() - } -} diff --git a/util.go b/util.go index 80673d3b0..960d12f4a 100644 --- a/util.go +++ b/util.go @@ -47,7 +47,7 @@ var storeMsg chan string // Help to send msg from store to webHub func webHelper() { storeMsg = make(chan string) - etcdStore.SetMessager(storeMsg) + // etcdStore.SetMessager(storeMsg) for { // transfer the new msg to webHub web.Hub().Send(<-storeMsg) @@ -177,6 +177,11 @@ func check(err error) { } } +func getNodePath(urlPath string) string { + pathPrefixLen := len("/" + version + "/keys") + return urlPath[pathPrefixLen:] +} + //-------------------------------------- // Log //-------------------------------------- @@ -259,7 +264,7 @@ func directSet() { func send(c chan bool) { for i := 0; i < 10; i++ { - command := &SetCommand{} + command := &UpdateCommand{} command.Key = "foo" command.Value = "bar" command.ExpireTime = time.Unix(0, 0)