Merge pull request #5 from xiangli-cmu/master

clean up tree.go and watcher.go
This commit is contained in:
Xiang Li 2013-07-09 13:44:30 -07:00
commit ab847c248b
5 changed files with 224 additions and 181 deletions

View File

@ -33,7 +33,7 @@ func (c *SetCommand) CommandName() string {
// Set the value of key to value // Set the value of key to value
func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex()) return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
} }
// TestAndSet command // TestAndSet command
@ -51,7 +51,7 @@ func (c *TestAndSetCommand) CommandName() string {
// Set the value of key to value // Set the value of key to value
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex()) return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
} }
// Get command // Get command
@ -66,7 +66,7 @@ func (c *GetCommand) CommandName() string {
// Set the value of key to value // Set the value of key to value
func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
res := store.Get(c.Key) res := etcdStore.Get(c.Key)
return json.Marshal(res) return json.Marshal(res)
} }
@ -82,7 +82,7 @@ func (c *ListCommand) CommandName() string {
// Set the value of key to value // Set the value of key to value
func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) { func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) {
return store.List(c.Prefix) return etcdStore.List(c.Prefix)
} }
// Delete command // Delete command
@ -97,7 +97,7 @@ func (c *DeleteCommand) CommandName() string {
// Delete the key // Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Delete(c.Key, server.CommitIndex()) return etcdStore.Delete(c.Key, server.CommitIndex())
} }
// Watch command // Watch command
@ -112,13 +112,13 @@ func (c *WatchCommand) CommandName() string {
} }
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
ch := make(chan store.Response, 1) watcher := store.CreateWatcher()
// add to the watchers list // add to the watchers list
store.AddWatcher(c.Key, ch, c.SinceIndex) etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex)
// wait for the notification for any changing // wait for the notification for any changing
res := <-ch res := <-watcher.C
return json.Marshal(res) return json.Marshal(res)
} }

11
etcd.go
View File

@ -113,9 +113,7 @@ type Info struct {
var server *raft.Server var server *raft.Server
var serverTransHandler transHandler var serverTransHandler transHandler
var logger *log.Logger var etcdStore *store.Store
var storeMsg chan string
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// //
@ -129,7 +127,6 @@ var storeMsg chan string
func main() { func main() {
var err error var err error
logger = log.New(os.Stdout, "", log.LstdFlags)
flag.Parse() flag.Parse()
// Setup commands. // Setup commands.
@ -162,10 +159,10 @@ func main() {
serverTransHandler = createTranHandler(st) serverTransHandler = createTranHandler(st)
// Setup new raft server. // Setup new raft server.
s := store.CreateStore(maxSize) etcdStore = store.CreateStore(maxSize)
// create raft server // create raft server
server, err = raft.NewServer(name, dirPath, serverTransHandler, s, nil) server, err = raft.NewServer(name, dirPath, serverTransHandler, etcdStore, nil)
if err != nil { if err != nil {
fatal("%v", err) fatal("%v", err)
@ -226,7 +223,7 @@ func main() {
if webPort != -1 { if webPort != -1 {
// start web // start web
s.SetMessager(&storeMsg) etcdStore.SetMessager(&storeMsg)
go webHelper() go webHelper()
go web.Start(server, webPort) go web.Start(server, webPort)
} }

View File

@ -38,6 +38,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{} aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq) err := decodeJsonRequest(req, aereq)
if err == nil { if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
if resp := server.AppendEntries(aereq); resp != nil { if resp := server.AppendEntries(aereq); resp != nil {
@ -121,7 +122,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
duration, err := strconv.Atoi(strDuration) duration, err := strconv.Atoi(strDuration)
if err != nil { if err != nil {
warn("raftd: Bad duration: %v", err) warn("Bad duration: %v", err)
(*w).WriteHeader(http.StatusInternalServerError) (*w).WriteHeader(http.StatusInternalServerError)
return return
} }
@ -150,7 +151,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
duration, err := strconv.Atoi(strDuration) duration, err := strconv.Atoi(strDuration)
if err != nil { if err != nil {
warn("raftd: Bad duration: %v", err) warn("Bad duration: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
@ -266,7 +267,7 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Prefix = prefix command.Prefix = prefix
if body, err := command.Apply(server); err != nil { if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err) warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} else { } else {
@ -309,7 +310,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
} }
if body, err := command.Apply(server); err != nil { if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err) warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} else { } else {

View File

@ -4,87 +4,107 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"path" "path"
"time"
"strconv" "strconv"
"time"
) )
// global store //------------------------------------------------------------------------------
var s *Store //
// Typedefs
// CONSTANTS //
const ( //------------------------------------------------------------------------------
ERROR = -1 + iota
SET
DELETE
GET
)
var PERMANENT = time.Unix(0, 0)
// The main struct of the Key-Value store
type Store struct { type Store struct {
// // use the build-in hash map as the key-value store structure
// Nodes map[string]Node `json:"nodes"`
// use treeMap as the key-value stroe structure // key-value store structure
Tree *tree Tree *tree
// the string channel to send messages to the outside world
// now we use it to send changes to the hub of the web service // WatcherHub is where we register all the clients
// who issue a watch request
watcher *WatcherHub
// The string channel to send messages to the outside world
// Now we use it to send changes to the hub of the web service
messager *chan string messager *chan string
// // A map to keep the recent response to the clients
ResponseMap map[string]Response ResponseMap map[string]Response
// // The max number of the recent responses we can record
ResponseMaxSize int ResponseMaxSize int
// The current number of the recent responses we have recorded
ResponseCurrSize uint ResponseCurrSize uint
// at some point, we may need to compact the Response // The index of the first recent responses we have
ResponseStartIndex uint64 ResponseStartIndex uint64
// current Index // Current index of the raft machine
Index uint64 Index uint64
} }
// A Node represents a Value in the Key-Value pair in the store
// It has its value, expire time and a channel used to update the
// expire time (since we do countdown in a go routine, we need to
// communicate with it via channel)
type Node struct { type Node struct {
// The string value of the node
Value string `json:"value"` Value string `json:"value"`
// if the node is a permanent one the ExprieTime will be Unix(0,0) // If the node is a permanent one the ExprieTime will be Unix(0,0)
// Otherwise after the expireTime, the node will be deleted // Otherwise after the expireTime, the node will be deleted
ExpireTime time.Time `json:"expireTime"` ExpireTime time.Time `json:"expireTime"`
// a channel to update the expireTime of the node // A channel to update the expireTime of the node
update chan time.Time `json:"-"` update chan time.Time `json:"-"`
} }
// The response from the store to the user who issue a command
type Response struct { type Response struct {
Action int `json:"action"` Action string `json:"action"`
Key string `json:"key"` Key string `json:"key"`
PrevValue string `json:"prevValue"` PrevValue string `json:"prevValue"`
Value string `json:"value"` Value string `json:"value"`
// if the key existed before the action, this field should be true // If the key existed before the action, this field should be true
// if the key did not exist before the action, this field should be false // If the key did not exist before the action, this field should be false
Exist bool `json:"exist"` Exist bool `json:"exist"`
Expiration time.Time `json:"expiration"` Expiration time.Time `json:"expiration"`
// countdown until expiration in seconds // Time to live in second
TTL int64 `json:"ttl"` TTL int64 `json:"ttl"`
// The command index of the raft machine when the command is executed
Index uint64 `json:"index"` Index uint64 `json:"index"`
} }
// A listNode represent the simplest Key-Value pair with its type
// It is only used when do list opeartion
// We want to have a file system like store, thus we distingush "file"
// and "directory"
type ListNode struct { type ListNode struct {
Key string Key string
Value string Value string
Type string Type string
} }
// make a new stroe var PERMANENT = time.Unix(0, 0)
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
// Create a new stroe
// Arguement max is the max number of response we want to record
func CreateStore(max int) *Store { func CreateStore(max int) *Store {
s = new(Store) s := new(Store)
s.messager = nil s.messager = nil
s.ResponseMap = make(map[string]Response) s.ResponseMap = make(map[string]Response)
s.ResponseStartIndex = 0 s.ResponseStartIndex = 0
s.ResponseMaxSize = max s.ResponseMaxSize = max
@ -92,9 +112,9 @@ func CreateStore(max int) *Store {
s.Tree = &tree{ s.Tree = &tree{
&treeNode{ &treeNode{
Node { Node{
"/", "/",
time.Unix(0,0), time.Unix(0, 0),
nil, nil,
}, },
true, true,
@ -102,88 +122,87 @@ func CreateStore(max int) *Store {
}, },
} }
s.watcher = createWatcherHub()
return s return s
} }
// return a pointer to the store // Set the messager of the store
func GetStore() *Store {
return s
}
// set the messager of the store
func (s *Store) SetMessager(messager *chan string) { func (s *Store) SetMessager(messager *chan string) {
s.messager = messager s.messager = messager
} }
// set the key to value, return the old value if the key exists // Set the key to value with expiration time
func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
//update index //Update index
s.Index = index s.Index = index
key = "/" + key key = path.Clean("/" + key)
key = path.Clean(key) isExpire := !expireTime.Equal(PERMANENT)
var isExpire bool = false // When the slow follower receive the set command
isExpire = !expireTime.Equal(PERMANENT)
// when the slow follower receive the set command
// the key may be expired, we should not add the node // the key may be expired, we should not add the node
// also if the node exist, we need to delete the node // also if the node exist, we need to delete the node
if isExpire && expireTime.Sub(time.Now()) < 0 { if isExpire && expireTime.Sub(time.Now()) < 0 {
return Delete(key, index) return s.Delete(key, index)
} }
var TTL int64 var TTL int64
// update ttl
// Update ttl
if isExpire { if isExpire {
TTL = int64(expireTime.Sub(time.Now()) / time.Second) TTL = int64(expireTime.Sub(time.Now()) / time.Second)
} else { } else {
// For permanent value, we set ttl to -1
TTL = -1 TTL = -1
} }
// get the node // Get the node
node, ok := s.Tree.get(key) node, ok := s.Tree.get(key)
if ok { if ok {
// if node is not permanent before // Update when node exists
// update its expireTime
// Node is not permanent
if !node.ExpireTime.Equal(PERMANENT) { if !node.ExpireTime.Equal(PERMANENT) {
// If node is not permanent
// Update its expireTime
node.update <- expireTime node.update <- expireTime
} else { } else {
// if we want the permanent node to have expire time
// we need to create a chan and create a go routine // If we want the permanent node to have expire time
// We need to create create a go routine with a channel
if isExpire { if isExpire {
node.update = make(chan time.Time) node.update = make(chan time.Time)
go expire(key, node.update, expireTime) go s.monitorExpiration(key, node.update, expireTime)
} }
} }
// update the information of the node // Update the information of the node
s.Tree.set(key, Node{value, expireTime, node.update}) s.Tree.set(key, Node{value, expireTime, node.update})
resp := Response{SET, key, node.Value, value, true, expireTime, TTL, index} resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index}
s.watcher.notify(resp)
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
notify(resp) // Send to the messager
// send to the messager
if s.messager != nil && err == nil { if s.messager != nil && err == nil {
*s.messager <- string(msg) *s.messager <- string(msg)
} }
updateMap(index, &resp) s.addToResponseMap(index, &resp)
return msg, err return msg, err
// add new node // Add new node
} else { } else {
update := make(chan time.Time) update := make(chan time.Time)
@ -191,32 +210,31 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
s.Tree.set(key, Node{value, expireTime, update}) s.Tree.set(key, Node{value, expireTime, update})
if isExpire { if isExpire {
go expire(key, update, expireTime) go s.monitorExpiration(key, update, expireTime)
} }
resp := Response{SET, key, "", value, false, expireTime, TTL, index} resp := Response{"SET", key, "", value, false, expireTime, TTL, index}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
// nofity the watcher // Nofity the watcher
notify(resp) s.watcher.notify(resp)
// notify the web interface // Send to the messager
if s.messager != nil && err == nil { if s.messager != nil && err == nil {
*s.messager <- string(msg) *s.messager <- string(msg)
} }
updateMap(index, &resp) s.addToResponseMap(index, &resp)
return msg, err return msg, err
} }
} }
// get the value of the key // Get the value of the key
func Get(key string) Response { func (s *Store) Get(key string) Response {
key = "/" + key
key = path.Clean(key) key = path.Clean("/" + key)
node, ok := s.Tree.get(key) node, ok := s.Tree.get(key)
@ -226,22 +244,24 @@ func Get(key string) Response {
isExpire = !node.ExpireTime.Equal(PERMANENT) isExpire = !node.ExpireTime.Equal(PERMANENT)
// update ttl // Update ttl
if isExpire { if isExpire {
TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
} else { } else {
TTL = -1 TTL = -1
} }
return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index} return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
} else {
return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index} } else {
// we do not found the key
return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index}
} }
} }
// // List all the item in the prefix // List all the item in the prefix
func List(prefix string) ([]byte, error) { func (s *Store) List(prefix string) ([]byte, error) {
nodes, keys, dirs, ok := s.Tree.list(prefix) nodes, keys, dirs, ok := s.Tree.list(prefix)
var ln []ListNode var ln []ListNode
@ -256,15 +276,14 @@ func List(prefix string) ([]byte, error) {
return json.Marshal(ln) return json.Marshal(ln)
} }
// delete the key // Delete the key
func Delete(key string, index uint64) ([]byte, error) { func (s *Store) Delete(key string, index uint64) ([]byte, error) {
//update index
key = "/" + key
key = path.Clean("/" + key)
//Update index
s.Index = index s.Index = index
key = path.Clean(key)
node, ok := s.Tree.get(key) node, ok := s.Tree.get(key)
if ok { if ok {
@ -275,17 +294,17 @@ func Delete(key string, index uint64) ([]byte, error) {
} else { } else {
// kill the expire go routine // Kill the expire go routine
node.update <- PERMANENT node.update <- PERMANENT
s.Tree.delete(key) s.Tree.delete(key)
} }
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, index} resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
notify(resp) s.watcher.notify(resp)
// notify the messager // notify the messager
if s.messager != nil && err == nil { if s.messager != nil && err == nil {
@ -293,50 +312,70 @@ func Delete(key string, index uint64) ([]byte, error) {
*s.messager <- string(msg) *s.messager <- string(msg)
} }
updateMap(index, &resp) s.addToResponseMap(index, &resp)
return msg, err return msg, err
} else { } else {
resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index} resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index}
updateMap(index, &resp) s.addToResponseMap(index, &resp)
return json.Marshal(resp) return json.Marshal(resp)
} }
} }
// set the value of the key to the value if the given prevValue is equal to the value of the key // Set the value of the key to the value if the given prevValue is equal to the value of the key
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := Get(key) resp := s.Get(key)
if resp.PrevValue == prevValue { if resp.PrevValue == prevValue {
return Set(key, value, expireTime, index)
// If test success, do set
return s.Set(key, value, expireTime, index)
} else { } else {
// If fails, return the result of get which contains the current
// status of the key-value pair
return json.Marshal(resp) return json.Marshal(resp)
} }
} }
// should be used as a go routine to delete the key when it expires // Add a channel to the watchHub.
func expire(key string, update chan time.Time, expireTime time.Time) { // The watchHub will send response to the channel when any key under the prefix
// changes [since the sinceIndex if given]
func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
}
// This function should be created as a go routine to delete the key-value pair
// when it reaches expiration time
func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now()) duration := expireTime.Sub(time.Now())
for { for {
select { select {
// timeout delete the node
// Timeout delete the node
case <-time.After(duration): case <-time.After(duration):
node, ok := s.Tree.get(key) node, ok := s.Tree.get(key)
if !ok { if !ok {
return return
} else { } else {
s.Tree.delete(key) s.Tree.delete(key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index} resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
notify(resp) s.watcher.notify(resp)
// notify the messager // notify the messager
if s.messager != nil && err == nil { if s.messager != nil && err == nil {
@ -349,21 +388,25 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
} }
case updateTime := <-update: case updateTime := <-update:
//update duration // Update duration
// if the node become a permanent one, the go routine is // If the node become a permanent one, the go routine is
// not needed // not needed
if updateTime.Equal(PERMANENT) { if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return return
} }
// update duration
// Update duration
duration = updateTime.Sub(time.Now()) duration = updateTime.Sub(time.Now())
} }
} }
} }
func updateMap(index uint64, resp *Response) { // When we receive a command that will change the state of the key-value store
// We will add the result of it to the ResponseMap for the use of watch command
// Also we may remove the oldest response when we add new one
func (s *Store) addToResponseMap(index uint64, resp *Response) {
// zero case
if s.ResponseMaxSize == 0 { if s.ResponseMaxSize == 0 {
return return
} }
@ -372,11 +415,13 @@ func updateMap(index uint64, resp *Response) {
s.ResponseMap[strIndex] = *resp s.ResponseMap[strIndex] = *resp
// unlimited // unlimited
if s.ResponseMaxSize < 0{ if s.ResponseMaxSize < 0 {
s.ResponseCurrSize++ s.ResponseCurrSize++
return return
} }
// if we reach the max point, we need to delete the most latest
// response and update the startIndex
if s.ResponseCurrSize == uint(s.ResponseMaxSize) { if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++ s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
@ -385,8 +430,7 @@ func updateMap(index uint64, resp *Response) {
} }
} }
// Save the current state of the storage system
// save the current state of the storage system
func (s *Store) Save() ([]byte, error) { func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s) b, err := json.Marshal(s)
if err != nil { if err != nil {
@ -396,31 +440,35 @@ func (s *Store) Save() ([]byte, error) {
return b, nil return b, nil
} }
// recovery the state of the stroage system from a previous state // Recovery the state of the stroage system from a previous state
func (s *Store) Recovery(state []byte) error { func (s *Store) Recovery(state []byte) error {
err := json.Unmarshal(state, s) err := json.Unmarshal(state, s)
// clean the expired nodes // The only thing need to change after the recovery is the
clean() // node with expiration time, we need to delete all the node
// that have been expired and setup go routines to monitor the
// other ones
s.checkExpiration()
return err return err
} }
// clean all expired keys // Clean the expired nodes
func clean() { // Set up go routines to mon
s.Tree.traverse(cleanNode, false) func (s *Store) checkExpiration() {
s.Tree.traverse(s.checkNode, false)
} }
// Check each node
func cleanNode(key string, node *Node) { func (s *Store) checkNode(key string, node *Node) {
if node.ExpireTime.Equal(PERMANENT) { if node.ExpireTime.Equal(PERMANENT) {
return return
} else { } else {
if node.ExpireTime.Sub(time.Now()) >= time.Second { if node.ExpireTime.Sub(time.Now()) >= time.Second {
node.update = make(chan time.Time) node.update = make(chan time.Time)
go expire(key, node.update, node.ExpireTime) go s.monitorExpiration(key, node.update, node.ExpireTime)
} else { } else {
// we should delete this node // we should delete this node

View File

@ -5,43 +5,44 @@ import (
"strconv" "strconv"
"strings" "strings"
) )
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// WatcherHub is where the client register its watcher
type WatcherHub struct { type WatcherHub struct {
watchers map[string][]Watcher watchers map[string][]*Watcher
} }
// Currently watcher only contains a response channel
type Watcher struct { type Watcher struct {
c chan Response C chan Response
} }
// global watcher // Create a new watcherHub
var w *WatcherHub
// init the global watcher
func init() {
w = createWatcherHub()
}
// create a new watcher
func createWatcherHub() *WatcherHub { func createWatcherHub() *WatcherHub {
w := new(WatcherHub) w := new(WatcherHub)
w.watchers = make(map[string][]Watcher) w.watchers = make(map[string][]*Watcher)
return w return w
} }
func GetWatcherHub() *WatcherHub { // Create a new watcher
return w func CreateWatcher() *Watcher {
return &Watcher{C: make(chan Response, 1)}
} }
// register a function with channel and prefix to the watcher // Add a watcher to the watcherHub
func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error {
prefix = "/" + path.Clean(prefix) prefix = path.Clean("/" + prefix)
if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex { if sinceIndex != 0 && sinceIndex >= responseStartIndex {
for i := sinceIndex; i <= s.Index; i++ { for i := sinceIndex; i <= currentIndex; i++ {
if check(prefix, i) { if checkResponse(prefix, i, resMap) {
c <- s.ResponseMap[strconv.FormatUint(i, 10)] watcher.C <- (*resMap)[strconv.FormatUint(i, 10)]
return nil return nil
} }
} }
@ -51,25 +52,21 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
if !ok { if !ok {
w.watchers[prefix] = make([]Watcher, 0) w.watchers[prefix] = make([]*Watcher, 0)
watcher := Watcher{c}
w.watchers[prefix] = append(w.watchers[prefix], watcher) w.watchers[prefix] = append(w.watchers[prefix], watcher)
} else { } else {
watcher := Watcher{c}
w.watchers[prefix] = append(w.watchers[prefix], watcher) w.watchers[prefix] = append(w.watchers[prefix], watcher)
} }
return nil return nil
} }
// check if the response has what we are watching // Check if the response has what we are watching
func check(prefix string, index uint64) bool { func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool {
resp, ok := s.ResponseMap[strconv.FormatUint(index, 10)] resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
if !ok { if !ok {
// not storage system command // not storage system command
@ -89,8 +86,8 @@ func check(prefix string, index uint64) bool {
} }
// notify the watcher a action happened // Notify the watcher a action happened
func notify(resp Response) error { func (w *WatcherHub) notify(resp Response) error {
resp.Key = path.Clean(resp.Key) resp.Key = path.Clean(resp.Key)
segments := strings.Split(resp.Key, "/") segments := strings.Split(resp.Key, "/")
@ -104,10 +101,10 @@ func notify(resp Response) error {
if ok { if ok {
newWatchers := make([]Watcher, 0) newWatchers := make([]*Watcher, 0)
// notify all the watchers // notify all the watchers
for _, watcher := range watchers { for _, watcher := range watchers {
watcher.c <- resp watcher.C <- resp
} }
if len(newWatchers) == 0 { if len(newWatchers) == 0 {