fix a bug in set(update node need to create a new one, not get the pointer of the node)

This commit is contained in:
Xiang Li 2013-06-20 21:34:27 -07:00
parent 1228694383
commit c3bfe19ae9
4 changed files with 50 additions and 26 deletions

View File

@ -90,7 +90,7 @@ func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) {
ch := make(chan store.Response) ch := make(chan store.Response)
// add to the watchers list // add to the watchers list
store.AddWatcher(c.Key, ch) store.AddWatcher(c.Key, ch, 0)
// wait for the notification for any changing // wait for the notification for any changing
res := <-ch res := <-ch

View File

@ -100,7 +100,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &SetCommand{} command := &SetCommand{}
command.Key = key command.Key = key
values := strings.Split(string(content), " ") values := strings.Split(string(content), ",")
command.Value = values[0] command.Value = values[0]

View File

@ -109,10 +109,9 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
go expire(key, node.update, expireTime) go expire(key, node.update, expireTime)
} }
} }
// update the information of the node // update the information of the node
node.ExpireTime = expireTime s.Nodes[key] = Node{value, expireTime, node.update}
node.Value = value
resp := Response{SET, key, node.Value, value, true, expireTime} resp := Response{SET, key, node.Value, value, true, expireTime}
@ -192,6 +191,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
// if the node become a permanent one, the go routine is // if the node become a permanent one, the go routine is
// not needed // not needed
if updateTime.Equal(PERMANENT) { if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return return
} }
// update duration // update duration

View File

@ -3,44 +3,60 @@ package store
import ( import (
"path" "path"
"strings" "strings"
//"fmt"
) )
type Watchers struct { const (
chanMap map[string][]chan Response SHORT = iota
LONG
)
type WatcherHub struct {
watchers map[string][]Watcher
}
type Watcher struct {
c chan Response
wType int
} }
// global watcher // global watcher
var w *Watchers var w *WatcherHub
// init the global watcher // init the global watcher
func init() { func init() {
w = createWatcher() w = createWatcherHub()
} }
// create a new watcher // create a new watcher
func createWatcher() *Watchers { func createWatcherHub() *WatcherHub {
w := new(Watchers) w := new(WatcherHub)
w.chanMap = make(map[string][]chan Response) w.watchers = make(map[string][]Watcher)
return w return w
} }
func Watcher() *Watchers { func GetWatcherHub() *WatcherHub {
return w return w
} }
// register a function with channel and prefix to the watcher // register a function with channel and prefix to the watcher
func AddWatcher(prefix string, c chan Response) error { func AddWatcher(prefix string, c chan Response, wType int) error {
prefix = "/" + path.Clean(prefix) prefix = "/" + path.Clean(prefix)
_, ok := w.chanMap[prefix] _, ok := w.watchers[prefix]
if !ok { if !ok {
w.chanMap[prefix] = make([]chan Response, 0)
w.chanMap[prefix] = append(w.chanMap[prefix], c) w.watchers[prefix] = make([]Watcher, 0)
watcher := Watcher{c, wType}
w.watchers[prefix] = append(w.watchers[prefix], watcher)
} else { } else {
w.chanMap[prefix] = append(w.chanMap[prefix], c)
watcher := Watcher{c, wType}
w.watchers[prefix] = append(w.watchers[prefix], watcher)
} }
return nil return nil
@ -57,18 +73,26 @@ func notify(resp Response) error {
for _, segment := range segments { for _, segment := range segments {
currPath = path.Join(currPath, segment) currPath = path.Join(currPath, segment)
chans, ok := w.chanMap[currPath] watchers, ok := w.watchers[currPath]
if ok { if ok {
newWatchers := make([]Watcher, 0)
// notify all the watchers // notify all the watchers
for _, c := range chans { for _, watcher := range watchers {
c <- resp watcher.c <- resp
if watcher.wType == LONG {
newWatchers = append(newWatchers, watcher)
}
} }
// we have notified all the watchers at this path if len(newWatchers) == 0 {
// delete the map // we have notified all the watchers at this path
delete(w.chanMap, currPath) // delete the map
delete(w.watchers, currPath)
} else {
w.watchers[currPath] = newWatchers
}
} }
} }