diff --git a/command.go b/command.go index ecbcb8d81..47daa0711 100644 --- a/command.go +++ b/command.go @@ -33,7 +33,7 @@ func (c *SetCommand) CommandName() string { // Set the value of key to value func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { - return store.Set(c.Key, c.Value, c.ExpireTime) + return store.Set(c.Key, c.Value, c.ExpireTime, server.CommittedIndex()) } // Get the path for http request @@ -73,7 +73,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - return store.Delete(c.Key) + return store.Delete(c.Key, server.CommittedIndex()) } // Watch command @@ -87,10 +87,10 @@ func (c *WatchCommand) CommandName() string { } func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { - ch := make(chan store.Response) + ch := make(chan store.Response, 1) // add to the watchers list - store.AddWatcher(c.Key, ch, 0) + store.AddWatcher(c.Key, ch, 1) // wait for the notification for any changing res := <-ch diff --git a/store/store.go b/store/store.go index f1e90f777..6e80b0418 100644 --- a/store/store.go +++ b/store/store.go @@ -27,6 +27,15 @@ type Store struct { // the string channel to send messages to the outside world // now we use it to send changes to the hub of the web service messager *chan string + + // previous responses + Responses []Response + + // at some point, we may need to compact the Response + ResponseStartIndex uint64 + + // current Index + Index uint64 } type Node struct { @@ -43,14 +52,19 @@ type Node struct { type Response struct { Action int `json:"action"` Key string `json:"key"` - OldValue string `json:"oldValue"` - NewValue string `json:"newValue"` + PrevValue string `json:"prevValue"` + Value string `json:"Value"` // if the key existed before the action, this field should be true // if the key did not exist before the action, this field should be false Exist bool `json:"exist"` Expiration time.Time `json:"expiration"` + + // countdown until expiration in seconds + TTL int64 `json:"TTL"` + + Index uint64 `json:"index"` } func init() { @@ -62,6 +76,8 @@ func init() { func createStore() *Store { s := new(Store) s.Nodes = make(map[string]Node) + s.Responses = make([]Response, 0) + s.ResponseStartIndex = 0 return s } @@ -76,7 +92,12 @@ func (s *Store) SetMessager(messager *chan string) { } // set the key to value, return the old value if the key exists -func Set(key string, value string, expireTime time.Time) ([]byte, error) { +func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { + + //update index + s.Index = index + + key = "/" + key key = path.Clean(key) @@ -88,7 +109,15 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { // the key may be expired, we should not add the node // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { - return Delete(key) + return Delete(key, index) + } + + var TTL int64 + // update ttl + if isExpire { + TTL = int64(expireTime.Sub(time.Now()) / time.Second) + } else { + TTL = -1 } // get the node @@ -108,12 +137,13 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { node.update = make(chan time.Time) go expire(key, node.update, expireTime) } + } // update the information of the node s.Nodes[key] = Node{value, expireTime, node.update} - resp := Response{SET, key, node.Value, value, true, expireTime} + resp := Response{SET, key, node.Value, value, true, expireTime, TTL, index} msg, err := json.Marshal(resp) @@ -125,9 +155,11 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { *s.messager <- string(msg) } + s.Responses = append(s.Responses, resp) + return msg, err - // add new node + // add new node } else { update := make(chan time.Time) @@ -138,7 +170,7 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { go expire(key, update, expireTime) } - resp := Response{SET, key, "", value, false, expireTime} + resp := Response{SET, key, "", value, false, expireTime, TTL, index} msg, err := json.Marshal(resp) @@ -150,7 +182,9 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { *s.messager <- string(msg) } - + + s.Responses = append(s.Responses, resp) + fmt.Println(len(s.Responses), " ") return msg, err } } @@ -170,7 +204,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) { delete(s.Nodes, key) - resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime} + resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index} msg, err := json.Marshal(resp) @@ -207,14 +241,30 @@ func Get(key string) Response { node, ok := s.Nodes[key] if ok { - return Response{GET, key, node.Value, node.Value, true, node.ExpireTime} + var TTL int64 + var isExpire bool = false + + isExpire = !node.ExpireTime.Equal(PERMANENT) + + // update ttl + if isExpire { + TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) + } else { + TTL = -1 + } + + return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index} } else { - return Response{GET, key, "", "", false, time.Unix(0, 0)} + + return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index} } } // delete the key -func Delete(key string) ([]byte, error) { +func Delete(key string, index uint64) ([]byte, error) { + //update index + s.Index = index + key = path.Clean(key) node, ok := s.Nodes[key] @@ -233,7 +283,7 @@ func Delete(key string) ([]byte, error) { } - resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime} + resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, index} msg, err := json.Marshal(resp) @@ -245,11 +295,15 @@ func Delete(key string) ([]byte, error) { *s.messager <- string(msg) } + s.Responses = append(s.Responses, resp) return msg, err } else { - return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)}) + resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index} + s.Responses = append(s.Responses, resp) + + return json.Marshal(resp) } } diff --git a/store/watcher.go b/store/watcher.go index 6b11e4769..1236ff811 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -3,11 +3,7 @@ package store import ( "path" "strings" -) - -const ( - SHORT = iota - LONG + "fmt" ) type WatcherHub struct { @@ -16,7 +12,6 @@ type WatcherHub struct { type Watcher struct { c chan Response - wType int } // global watcher @@ -39,22 +34,32 @@ func GetWatcherHub() *WatcherHub { } // register a function with channel and prefix to the watcher -func AddWatcher(prefix string, c chan Response, wType int) error { +func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { prefix = "/" + path.Clean(prefix) + if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex { + + for i := sinceIndex; i < s.Index; i++ { + if check(prefix, i) { + c <- s.Responses[i] + return nil + } + } + } + _, ok := w.watchers[prefix] if !ok { w.watchers[prefix] = make([]Watcher, 0) - watcher := Watcher{c, wType} + watcher := Watcher{c} w.watchers[prefix] = append(w.watchers[prefix], watcher) } else { - watcher := Watcher{c, wType} + watcher := Watcher{c} w.watchers[prefix] = append(w.watchers[prefix], watcher) } @@ -62,6 +67,30 @@ func AddWatcher(prefix string, c chan Response, wType int) error { return nil } +// check if the response has what we are waching +func check(prefix string, index uint64) bool { + + index = index - s.ResponseStartIndex + + if index < 0 { + return false + } + + path := s.Responses[index].Key + fmt.Println("checking ", path, " ", prefix) + if strings.HasPrefix(path, prefix) { + fmt.Println("checking found") + prefixLen := len(prefix) + if len(path) == prefixLen || path[prefixLen] == '/' { + return true + } + + } + + return false +} + + // notify the watcher a action happened func notify(resp Response) error { resp.Key = path.Clean(resp.Key) @@ -81,9 +110,6 @@ func notify(resp Response) error { // notify all the watchers for _, watcher := range watchers { watcher.c <- resp - if watcher.wType == LONG { - newWatchers = append(newWatchers, watcher) - } } if len(newWatchers) == 0 {