diff --git a/command.go b/command.go index e7369cf44..cd259ef10 100644 --- a/command.go +++ b/command.go @@ -90,7 +90,7 @@ func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) { ch := make(chan store.Response) // add to the watchers list - store.AddWatcher(c.Key, ch) + store.AddWatcher(c.Key, ch, 0) // wait for the notification for any changing res := <-ch diff --git a/handlers.go b/handlers.go index ce6167ae1..fa846b504 100644 --- a/handlers.go +++ b/handlers.go @@ -100,7 +100,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { command := &SetCommand{} command.Key = key - values := strings.Split(string(content), " ") + values := strings.Split(string(content), ",") command.Value = values[0] diff --git a/store/store.go b/store/store.go index 0bb1027d7..a11983980 100644 --- a/store/store.go +++ b/store/store.go @@ -109,10 +109,9 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { go expire(key, node.update, expireTime) } } - + // update the information of the node - node.ExpireTime = expireTime - node.Value = value + s.Nodes[key] = Node{value, expireTime, node.update} resp := Response{SET, key, node.Value, value, true, expireTime} @@ -192,6 +191,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) { // if the node become a permanent one, the go routine is // not needed if updateTime.Equal(PERMANENT) { + fmt.Println("permanent") return } // update duration diff --git a/store/watcher.go b/store/watcher.go index 4d5932962..dd9ae8ce7 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -3,44 +3,60 @@ package store import ( "path" "strings" - -//"fmt" ) -type Watchers struct { - chanMap map[string][]chan Response +const ( + SHORT = iota + LONG +) + +type WatcherHub struct { + watchers map[string][]Watcher +} + +type Watcher struct { + c chan Response + wType int } // global watcher -var w *Watchers +var w *WatcherHub // init the global watcher func init() { - w = createWatcher() + w = createWatcherHub() } // create a new watcher -func createWatcher() *Watchers { - w := new(Watchers) - w.chanMap = make(map[string][]chan Response) +func createWatcherHub() *WatcherHub { + w := new(WatcherHub) + w.watchers = make(map[string][]Watcher) return w } -func Watcher() *Watchers { +func GetWatcherHub() *WatcherHub { return w } // register a function with channel and prefix to the watcher -func AddWatcher(prefix string, c chan Response) error { +func AddWatcher(prefix string, c chan Response, wType int) error { prefix = "/" + path.Clean(prefix) - _, ok := w.chanMap[prefix] + _, ok := w.watchers[prefix] + if !ok { - w.chanMap[prefix] = make([]chan Response, 0) - w.chanMap[prefix] = append(w.chanMap[prefix], c) + + w.watchers[prefix] = make([]Watcher, 0) + + watcher := Watcher{c, wType} + + w.watchers[prefix] = append(w.watchers[prefix], watcher) } else { - w.chanMap[prefix] = append(w.chanMap[prefix], c) + + watcher := Watcher{c, wType} + + w.watchers[prefix] = append(w.watchers[prefix], watcher) } return nil @@ -57,18 +73,26 @@ func notify(resp Response) error { for _, segment := range segments { currPath = path.Join(currPath, segment) - chans, ok := w.chanMap[currPath] + watchers, ok := w.watchers[currPath] if ok { + newWatchers := make([]Watcher, 0) // notify all the watchers - for _, c := range chans { - c <- resp + for _, watcher := range watchers { + watcher.c <- resp + if watcher.wType == LONG { + newWatchers = append(newWatchers, watcher) + } } - // we have notified all the watchers at this path - // delete the map - delete(w.chanMap, currPath) + if len(newWatchers) == 0 { + // we have notified all the watchers at this path + // delete the map + delete(w.watchers, currPath) + } else { + w.watchers[currPath] = newWatchers + } } }