watch sinceindex works

This commit is contained in:
Xiang Li 2013-06-29 15:29:10 -07:00
parent eef6f45e09
commit 047f8ab6a8
3 changed files with 67 additions and 24 deletions

View File

@ -87,10 +87,10 @@ func (c *WatchCommand) CommandName() string {
} }
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
ch := make(chan store.Response) ch := make(chan store.Response, 1)
// add to the watchers list // add to the watchers list
store.AddWatcher(c.Key, ch, 0) store.AddWatcher(c.Key, ch, 1)
// wait for the notification for any changing // wait for the notification for any changing
res := <-ch res := <-ch

View File

@ -28,8 +28,14 @@ type Store struct {
// now we use it to send changes to the hub of the web service // now we use it to send changes to the hub of the web service
messager *chan string messager *chan string
// previous responses
Responses []Response
// at some point, we may need to compact the Response
ResponseStartIndex uint64
// current Index // current Index
index uint64 Index uint64
} }
type Node struct { type Node struct {
@ -70,6 +76,8 @@ func init() {
func createStore() *Store { func createStore() *Store {
s := new(Store) s := new(Store)
s.Nodes = make(map[string]Node) s.Nodes = make(map[string]Node)
s.Responses = make([]Response, 0)
s.ResponseStartIndex = 0
return s return s
} }
@ -87,7 +95,9 @@ func (s *Store) SetMessager(messager *chan string) {
func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
//update index //update index
s.index = index s.Index = index
key = "/" + key
key = path.Clean(key) key = path.Clean(key)
@ -113,8 +123,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
// get the node // get the node
node, ok := s.Nodes[key] node, ok := s.Nodes[key]
if ok { if ok {
// if node is not permanent before // if node is not permanent before
// update its expireTime // update its expireTime
@ -147,6 +155,8 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
*s.messager <- string(msg) *s.messager <- string(msg)
} }
s.Responses = append(s.Responses, resp)
return msg, err return msg, err
// add new node // add new node
@ -173,6 +183,8 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
*s.messager <- string(msg) *s.messager <- string(msg)
} }
s.Responses = append(s.Responses, resp)
fmt.Println(len(s.Responses), " ")
return msg, err return msg, err
} }
} }
@ -192,7 +204,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
delete(s.Nodes, key) delete(s.Nodes, key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.index} resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
@ -241,16 +253,17 @@ func Get(key string) Response {
TTL = -1 TTL = -1
} }
return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.index} return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
} else { } else {
return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.index}
return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index}
} }
} }
// delete the key // delete the key
func Delete(key string, index uint64) ([]byte, error) { func Delete(key string, index uint64) ([]byte, error) {
//update index //update index
s.index = index s.Index = index
key = path.Clean(key) key = path.Clean(key)
@ -282,11 +295,15 @@ func Delete(key string, index uint64) ([]byte, error) {
*s.messager <- string(msg) *s.messager <- string(msg)
} }
s.Responses = append(s.Responses, resp)
return msg, err return msg, err
} else { } else {
return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}) resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}
s.Responses = append(s.Responses, resp)
return json.Marshal(resp)
} }
} }

View File

@ -3,11 +3,7 @@ package store
import ( import (
"path" "path"
"strings" "strings"
) "fmt"
const (
SHORT = iota
LONG
) )
type WatcherHub struct { type WatcherHub struct {
@ -16,7 +12,6 @@ type WatcherHub struct {
type Watcher struct { type Watcher struct {
c chan Response c chan Response
wType int
} }
// global watcher // global watcher
@ -39,22 +34,32 @@ func GetWatcherHub() *WatcherHub {
} }
// register a function with channel and prefix to the watcher // register a function with channel and prefix to the watcher
func AddWatcher(prefix string, c chan Response, wType int) error { func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
prefix = "/" + path.Clean(prefix) prefix = "/" + path.Clean(prefix)
if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
for i := sinceIndex; i < s.Index; i++ {
if check(prefix, i) {
c <- s.Responses[i]
return nil
}
}
}
_, ok := w.watchers[prefix] _, ok := w.watchers[prefix]
if !ok { if !ok {
w.watchers[prefix] = make([]Watcher, 0) w.watchers[prefix] = make([]Watcher, 0)
watcher := Watcher{c, wType} watcher := Watcher{c}
w.watchers[prefix] = append(w.watchers[prefix], watcher) w.watchers[prefix] = append(w.watchers[prefix], watcher)
} else { } else {
watcher := Watcher{c, wType} watcher := Watcher{c}
w.watchers[prefix] = append(w.watchers[prefix], watcher) w.watchers[prefix] = append(w.watchers[prefix], watcher)
} }
@ -62,6 +67,30 @@ func AddWatcher(prefix string, c chan Response, wType int) error {
return nil return nil
} }
// check if the response has what we are waching
func check(prefix string, index uint64) bool {
index = index - s.ResponseStartIndex
if index < 0 {
return false
}
path := s.Responses[index].Key
fmt.Println("checking ", path, " ", prefix)
if strings.HasPrefix(path, prefix) {
fmt.Println("checking found")
prefixLen := len(prefix)
if len(path) == prefixLen || path[prefixLen] == '/' {
return true
}
}
return false
}
// notify the watcher a action happened // notify the watcher a action happened
func notify(resp Response) error { func notify(resp Response) error {
resp.Key = path.Clean(resp.Key) resp.Key = path.Clean(resp.Key)
@ -81,9 +110,6 @@ func notify(resp Response) error {
// notify all the watchers // notify all the watchers
for _, watcher := range watchers { for _, watcher := range watchers {
watcher.c <- resp watcher.c <- resp
if watcher.wType == LONG {
newWatchers = append(newWatchers, watcher)
}
} }
if len(newWatchers) == 0 { if len(newWatchers) == 0 {