From f8ca35fd77d8265766c9751945afec72d9ef110f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 7 Jun 2013 11:35:49 -0700 Subject: [PATCH] add notification struct --- store.go | 6 +++++- watcher.go | 48 +++++++++++++++++++++++++++++++----------------- watcher_test.go | 22 +++++++++++++++++----- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/store.go b/store.go index bc0d721df..6e61578dd 100644 --- a/store.go +++ b/store.go @@ -26,14 +26,16 @@ func (s *Store) Set(key string, value string) (string, bool) { if ok { s.Nodes[key] = value return oldValue, true + } else { + s.Nodes[key] = value return "", false } } -// get the node of the key +// get the value of the key func (s *Store) Get(key string) (string, error) { key = path.Clean(key) @@ -60,6 +62,7 @@ func (s *Store) Delete(key string) (string, error) { } } +// save the current state of the storage system func (s *Store) Save() ([]byte, error) { b, err := json.Marshal(s) if err != nil { @@ -68,6 +71,7 @@ func (s *Store) Save() ([]byte, error) { return b, nil } +// recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { err := json.Unmarshal(state, s) return err diff --git a/watcher.go b/watcher.go index 3760ec401..2799aa744 100644 --- a/watcher.go +++ b/watcher.go @@ -6,43 +6,44 @@ import ( "fmt" ) +// CONSTANTS + type Watcher struct { - chanMap map[string][]chan int + chanMap map[string][]chan Notification +} + +type Notification struct { + action int + key string + oldValue string + newValue string } func createWatcher() *Watcher { w := new(Watcher) - w.chanMap = make(map[string][]chan int) + w.chanMap = make(map[string][]chan Notification) return w } -func (w *Watcher) add(prefix string, c chan int) error { +func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notification)) error { prefix = path.Clean(prefix) fmt.Println("Add ", prefix) + _, ok := w.chanMap[prefix] if !ok { - w.chanMap[prefix] = make([]chan int, 0) + w.chanMap[prefix] = make([]chan Notification, 0) w.chanMap[prefix] = append(w.chanMap[prefix], c) } else { w.chanMap[prefix] = append(w.chanMap[prefix], c) } + fmt.Println(len(w.chanMap[prefix]), "@", prefix) - go wait(c) + + go f(c) return nil } -func wait(c chan int) { - result := <-c - - if result == 0 { - fmt.Println("yes") - } else { - fmt.Println("no") - } - -} - func (w *Watcher) notify(action int, key string, oldValue string, newValue string) error { key = path.Clean(key) @@ -50,17 +51,30 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin currPath := "/" + // walk through all the pathes for _, segment := range segments { + currPath := path.Join(currPath, segment) + fmt.Println(currPath) + chans, ok := w.chanMap[currPath] + if ok { fmt.Println("found ", currPath) + + n := Notification {action, key, oldValue, newValue} + // notify all the watchers for _, c := range chans { - c <- 0 + c <- n } + + // we have notified all the watchers at this path + // delete the map delete(w.chanMap, currPath) } + } + return nil } \ No newline at end of file diff --git a/watcher_test.go b/watcher_test.go index 5a2f1b6dc..2f899b77a 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -2,13 +2,25 @@ package raftd import ( "testing" + "fmt" ) func TestWatch(t *testing.T) { watcher := createWatcher() - c := make(chan int) - d := make(chan int) - watcher.add("/prefix/", c) - watcher.add("/prefix/", d) + c := make(chan Notification) + d := make(chan Notification) + watcher.add("/", c, say) + watcher.add("/prefix/", d, say) watcher.notify(0, "/prefix/hihihi", "1", "1") -} \ No newline at end of file +} + +func say(c chan Notification) { + result := <-c + + if result.action != -1 { + fmt.Println("yes") + } else { + fmt.Println("no") + } + +}