diff --git a/store/store.go b/store/store.go new file mode 100644 index 000000000..f2f31aba1 --- /dev/null +++ b/store/store.go @@ -0,0 +1,201 @@ +package store + +import ( + "path" + "encoding/json" + "time" + "fmt" + ) + +// CONSTANTS +const ( + ERROR = -1 + iota + SET + DELETE + GET +) + +type Store struct { + Nodes map[string]Node `json:"nodes"` +} + +type Node struct { + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + update chan time.Time `json:"-"` +} + +type Response struct { + Action int `json:"action"` + Key string `json:"key"` + OldValue string `json:"oldValue"` + NewValue string `json:"newValue"` + Exist bool `json:"exist"` + Expiration time.Time `json:"expiration"` +} + + +// global store +var s *Store + +func init() { + s = createStore() +} + +// make a new stroe +func createStore() *Store{ + s := new(Store) + s.Nodes = make(map[string]Node) + return s +} + +func GetStore() *Store { + return s +} + +// set the key to value, return the old value if the key exists +func Set(key string, value string, expireTime time.Time) Response { + + key = path.Clean(key) + + var isExpire bool = false + + isExpire = !expireTime.Equal(time.Unix(0,0)) + + // when the slow follower receive the set command + // the key may be expired, we need also to delete + // the previous value of key + if isExpire && expireTime.Sub(time.Now()) < 0 { + return Delete(key) + } + + node, ok := s.Nodes[key] + + if ok { + //update := make(chan time.Time) + //s.Nodes[key] = Node{value, expireTime, update} + + node.ExpireTime = expireTime + node.Value = value + notify(SET, key, node.Value, value, true) + // if node is not permanent before + // update its expireTime + if !node.ExpireTime.Equal(time.Unix(0,0)) { + node.update <- expireTime + + } else { + + // if we want the permanent to have expire time + // we need to create a chan and create a func + if isExpire { + node.update = make(chan time.Time) + + go expire(key, node.update, expireTime) + } + } + + return Response{SET, key, node.Value, value, true, expireTime} + + } else { + + update := make(chan time.Time) + + s.Nodes[key] = Node{value, expireTime, update} + + notify(SET, key, "", value, false) + + if isExpire { + go expire(key, update, expireTime) + } + + return Response{SET, key, "", value, false, time.Unix(0, 0)} + } +} + +// delete the key when it expires +func expire(key string, update chan time.Time, expireTime time.Time) { + duration := expireTime.Sub(time.Now()) + + for { + select { + // timeout delte key + case <-time.After(duration): + fmt.Println("expired at ", time.Now()) + Delete(key) + return + case updateTime := <-update: + //update duration + if updateTime.Equal(time.Unix(0,0)) { + fmt.Println("node became stable") + return + } + duration = updateTime.Sub(time.Now()) + } + } +} + +// get the value of the key +func Get(key string) Response { + key = path.Clean(key) + + node, ok := s.Nodes[key] + + if ok { + return Response{GET, key, node.Value, node.Value, true, node.ExpireTime} + } else { + return Response{GET, key, "", "", false, time.Unix(0, 0)} + } +} + +// delete the key, return the old value if the key exists +func Delete(key string) Response { + key = path.Clean(key) + + node, ok := s.Nodes[key] + + if ok { + delete(s.Nodes, key) + + notify(DELETE, key, node.Value, "", true) + + return Response{DELETE, key, node.Value, "", true, node.ExpireTime} + } else { + return Response{DELETE, key, "", "", false, time.Unix(0, 0)} + } +} + +// save the current state of the storage system +func (s *Store)Save() ([]byte, error) { + b, err := json.Marshal(s) + if err != nil { + fmt.Println(err) + return nil, err + } + return b, nil +} + +// recovery the state of the stroage system from a previous state +func (s *Store)Recovery(state []byte) error { + err := json.Unmarshal(state, s) + clean() + return err +} + +// clean all expired keys +func clean() { + for key, node := range s.Nodes{ + // stable node + if node.ExpireTime.Equal(time.Unix(0,0)) { + continue + } else { + if node.ExpireTime.Sub(time.Now()) >= time.Second { + node.update = make(chan time.Time) + go expire(key, node.update, node.ExpireTime) + } else { + // we should delete this node + delete(s.Nodes, key) + } + } + + } +} diff --git a/store/store_test.go b/store/store_test.go new file mode 100644 index 000000000..a51220ffd --- /dev/null +++ b/store/store_test.go @@ -0,0 +1,126 @@ +package store + +import ( + "testing" + "time" + "fmt" +) + +func TestStoreGet(t *testing.T) { + + Set("foo", "bar", time.Unix(0, 0)) + + res := Get("foo") + + if res.NewValue != "bar" { + t.Fatalf("Cannot get stored value") + } + + Delete("foo") + res = Get("foo") + + if res.Exist { + t.Fatalf("Got deleted value") + } +} + +// func TestSaveAndRecovery(t *testing.T) { + +// Set("foo", "bar", time.Unix(0, 0)) +// Set("foo2", "bar2", time.Now().Add(time.Second * 5)) +// state, err := s.Save() + +// if err != nil { +// t.Fatalf("Cannot Save") +// } + +// newStore := createStore() + +// // wait for foo2 expires +// time.Sleep(time.Second * 6) + +// newStore.Recovery(state) + +// res := newStore.Get("foo") + +// if res.OldValue != "bar" { +// t.Fatalf("Cannot recovery") +// } + +// res = newStore.Get("foo2") + +// if res.Exist { +// t.Fatalf("Get expired value") +// } + + +// s.Delete("foo") + +// } + +func TestExpire(t *testing.T) { + fmt.Println(time.Now()) + fmt.Println("TEST EXPIRE") + + // test expire + Set("foo", "bar", time.Now().Add(time.Second * 1)) + time.Sleep(2*time.Second) + + res := Get("foo") + + if res.Exist { + t.Fatalf("Got expired value") + } + + //test change expire time + Set("foo", "bar", time.Now().Add(time.Second * 10)) + + res = Get("foo") + + if !res.Exist { + t.Fatalf("Cannot get Value") + } + + Set("foo", "barbar", time.Now().Add(time.Second * 1)) + + time.Sleep(2 * time.Second) + + res = Get("foo") + + if res.Exist { + t.Fatalf("Got expired value") + } + + + // test change expire to stable + Set("foo", "bar", time.Now().Add(time.Second * 1)) + + Set("foo", "bar", time.Unix(0,0)) + + time.Sleep(2*time.Second) + + res = s.Get("foo") + + if !res.Exist { + t.Fatalf("Cannot get Value") + } + + // test stable to expire + s.Set("foo", "bar", time.Now().Add(time.Second * 1)) + time.Sleep(2*time.Second) + res = s.Get("foo") + + if res.Exist { + t.Fatalf("Got expired value") + } + + // test set older node + s.Set("foo", "bar", time.Now().Add(-time.Second * 1)) + res = s.Get("foo") + + if res.Exist { + t.Fatalf("Got expired value") + } + + +} diff --git a/store/tree_store.bak b/store/tree_store.bak new file mode 100644 index 000000000..60616b3f5 --- /dev/null +++ b/store/tree_store.bak @@ -0,0 +1,85 @@ +package main + +import ( + "path" + "strings" + ) + +type store struct { + nodes map[string]node +} + +type node struct { + value string + dir bool // just for clearity + nodes map[string]node +} + +// set the key to value, return the old value if the key exists +func (s *store) set(key string, value string) string, error { + + key = path.Clean(key) + + nodeNames := strings.Split(key, "/") + + levelNodes := s.nodes + for i = 0; i < len(nodes) - 1; ++i { + node, ok := levelNodes[nodeNames[i]] + // add new dir + if !ok { + node := Node{nodeNames[i], true, make(map[string]node)} + levelNodes[nodeNames[i]] := node + } else if ok && !node.dir { + return nil, errors.New("The key is a directory") + } + else { + levelNodes = levelNodes.nodes + } + } + // add the last node and value + node, ok := levelNodes[nodeNames[i]] + + if !ok { + node := Node{nodeNames[i], false, nil} + levelNodes[nodeNames] = node + return nil, nil + } else { + oldValue := node.value + node.value = value + return oldValue ,nil + } + +} + +// get the node of the key +func (s *store) get(key string) node { + key = path.Clean(key) + + nodeNames := strings.Split(key, "/") + + levelNodes := s.nodes + + for i = 0; i < len(nodes) - 1; ++i { + node, ok := levelNodes[nodeNames[i]] + if !ok || !node.dir { + return nil + } + levelNodes = levelNodes.nodes + } + + node, ok := levelNodes[nodeNames[i]] + if ok { + return node + } + return nil + +} + +// delete the key, return the old value if the key exists +func (s *store) delete(key string) string { + return nil +} + +func (n *node) Value() string{ + return n.value +} diff --git a/store/watcher.go b/store/watcher.go new file mode 100644 index 000000000..7fcc9ac8b --- /dev/null +++ b/store/watcher.go @@ -0,0 +1,80 @@ +package store + +import ( + "path" + "strings" + //"fmt" + "time" + ) + + +type Watchers struct { + chanMap map[string][]chan Response +} + +// global watcher +var w *Watchers + + +// init the global watcher +func init() { + w = createWatcher() +} + +// create a new watcher +func createWatcher() *Watchers { + w := new(Watchers) + w.chanMap = make(map[string][]chan Response) + return w +} + +func Watcher() *Watchers { + return w +} + +// register a function with channel and prefix to the watcher +func AddWatcher(prefix string, c chan Response) error { + + prefix = "/" + path.Clean(prefix) + + _, ok := w.chanMap[prefix] + if !ok { + w.chanMap[prefix] = make([]chan Response, 0) + w.chanMap[prefix] = append(w.chanMap[prefix], c) + } else { + w.chanMap[prefix] = append(w.chanMap[prefix], c) + } + + return nil +} + +// notify the watcher a action happened +func notify(action int, key string, oldValue string, newValue string, exist bool) error { + key = path.Clean(key) + segments := strings.Split(key, "/") + currPath := "/" + + // walk through all the pathes + for _, segment := range segments { + currPath = path.Join(currPath, segment) + + chans, ok := w.chanMap[currPath] + + if ok { + + n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)} + + // notify all the watchers + for _, c := range chans { + c <- n + } + + // we have notified all the watchers at this path + // delete the map + delete(w.chanMap, currPath) + } + + } + + return nil +} \ No newline at end of file diff --git a/store/watcher_test.go b/store/watcher_test.go new file mode 100644 index 000000000..7c6ecb56b --- /dev/null +++ b/store/watcher_test.go @@ -0,0 +1,29 @@ +package store + +import ( + "testing" + "fmt" + "time" +) + +func TestWatch(t *testing.T) { + // watcher := createWatcher() + c := make(chan Response) + d := make(chan Response) + w.add("/", c) + go say(c) + w.add("/prefix/", d) + go say(d) + s.Set("/prefix/foo", "bar", time.Unix(0, 0)) +} + +func say(c chan Response) { + result := <-c + + if result.Action != -1 { + fmt.Println("yes") + } else { + fmt.Println("no") + } + +} diff --git a/web/conn.go b/web/conn.go new file mode 100644 index 000000000..ab0e6d7c6 --- /dev/null +++ b/web/conn.go @@ -0,0 +1,30 @@ +package web + +import ( + "code.google.com/p/go.net/websocket" +) + +type connection struct { + // The websocket connection. + ws *websocket.Conn + + // Buffered channel of outbound messages. + send chan string +} + +func (c *connection) writer() { + for message := range c.send { + err := websocket.Message.Send(c.ws, message) + if err != nil { + break + } + } + c.ws.Close() +} + +func wsHandler(ws *websocket.Conn) { + c := &connection{send: make(chan string, 256), ws: ws} + h.register <- c + defer func() { h.unregister <- c }() + c.writer() +} \ No newline at end of file diff --git a/web/home.html b/web/home.html new file mode 100644 index 000000000..4a83f24af --- /dev/null +++ b/web/home.html @@ -0,0 +1,87 @@ + +
+