From 66f4e0aa192e87a29379024db2aa341035b74074 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 3 Aug 2013 23:29:05 -0700 Subject: [PATCH 1/7] clear watchers before recover from a snapshot --- client_handlers.go | 2 +- command.go | 4 ++++ error.go | 5 ++++- store/store.go | 11 ++++++++--- store/watcher.go | 21 ++++++++++++++++----- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 6b1142b8e..7ee302e46 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -300,8 +300,8 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } if body, err := command.Apply(raftServer); err != nil { - warnf("Unable to do watch command: %v", err) w.WriteHeader(http.StatusInternalServerError) + w.Write(newJsonError(500, key)) } else { w.WriteHeader(http.StatusOK) diff --git a/command.go b/command.go index aea0caddc..209a7d792 100644 --- a/command.go +++ b/command.go @@ -101,6 +101,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // wait for the notification for any changing res := <-watcher.C + if res == nil { + return nil, fmt.Errorf("watcher is cleared") + } + return json.Marshal(res) } diff --git a/error.go b/error.go index 86442311a..a748be48a 100644 --- a/error.go +++ b/error.go @@ -20,7 +20,7 @@ func init() { errors[201] = "PrevValue is Required in POST form" errors[202] = "The given TTL in POST form is not a number" errors[203] = "The given index in POST form is not a number" - + // raft related errors errors[300] = "Raft Internal Error" errors[301] = "During Leader Election" @@ -28,6 +28,9 @@ func init() { // keyword errors[400] = "The prefix of the given key is a keyword in etcd" + // etcd related errors + errors[500] = "watcher is cleared due to etcd recovery" + } type jsonError struct { diff --git a/store/store.go b/store/store.go index 4afbfb868..000f725ac 100644 --- a/store/store.go +++ b/store/store.go @@ -29,7 +29,7 @@ type Store struct { messager *chan string // A map to keep the recent response to the clients - ResponseMap map[string]Response + ResponseMap map[string]*Response // The max number of the recent responses we can record ResponseMaxSize int @@ -109,7 +109,7 @@ func CreateStore(max int) *Store { s.messager = nil - s.ResponseMap = make(map[string]Response) + s.ResponseMap = make(map[string]*Response) s.ResponseStartIndex = 0 s.ResponseMaxSize = max s.ResponseCurrSize = 0 @@ -502,7 +502,7 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) { } strIndex := strconv.FormatUint(index, 10) - s.ResponseMap[strIndex] = *resp + s.ResponseMap[strIndex] = resp // unlimited if s.ResponseMaxSize < 0 { @@ -532,6 +532,11 @@ func (s *Store) Save() ([]byte, error) { // Recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { + + // we need to stop all the current watchers + // recovery will clear watcherHub + s.watcher.stopWatchers() + err := json.Unmarshal(state, s) // The only thing need to change after the recovery is the diff --git a/store/watcher.go b/store/watcher.go index 627ec86ee..faa0cffed 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -19,7 +19,7 @@ type WatcherHub struct { // Currently watcher only contains a response channel type Watcher struct { - C chan Response + C chan *Response } // Create a new watcherHub @@ -31,12 +31,12 @@ func createWatcherHub() *WatcherHub { // Create a new watcher func CreateWatcher() *Watcher { - return &Watcher{C: make(chan Response, 1)} + return &Watcher{C: make(chan *Response, 1)} } // Add a watcher to the watcherHub func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64, - responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error { + responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error { prefix = path.Clean("/" + prefix) @@ -65,7 +65,7 @@ func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint } // Check if the response has what we are watching -func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool { +func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool { resp, ok := (*resMap)[strconv.FormatUint(index, 10)] @@ -104,7 +104,7 @@ func (w *WatcherHub) notify(resp Response) error { newWatchers := make([]*Watcher, 0) // notify all the watchers for _, watcher := range watchers { - watcher.C <- resp + watcher.C <- &resp } if len(newWatchers) == 0 { @@ -120,3 +120,14 @@ func (w *WatcherHub) notify(resp Response) error { return nil } + +// stopWatchers stops all the watchers +// This function is used when the etcd recovery from a snapshot at runtime +func (w *WatcherHub) stopWatchers() { + for _, subWatchers := range w.watchers{ + for _, watcher := range subWatchers{ + watcher.C <- nil + } + } + w.watchers = nil +} \ No newline at end of file From ce33c4d29f7dd1e90411d3cb916e0b41086770fa Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 3 Aug 2013 23:30:15 -0700 Subject: [PATCH 2/7] gofmt --- store/watcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/watcher.go b/store/watcher.go index faa0cffed..6bc353f58 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -124,10 +124,10 @@ func (w *WatcherHub) notify(resp Response) error { // stopWatchers stops all the watchers // This function is used when the etcd recovery from a snapshot at runtime func (w *WatcherHub) stopWatchers() { - for _, subWatchers := range w.watchers{ - for _, watcher := range subWatchers{ + for _, subWatchers := range w.watchers { + for _, watcher := range subWatchers { watcher.C <- nil } } w.watchers = nil -} \ No newline at end of file +} From 8a4b2e83a5a4850d765018d70d07bf26e72808f7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 4 Aug 2013 17:17:40 -0700 Subject: [PATCH 3/7] change create->new, follow go spec --- command.go | 2 +- store/store.go | 2 +- store/watcher.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/command.go b/command.go index 209a7d792..3ad371819 100644 --- a/command.go +++ b/command.go @@ -93,7 +93,7 @@ func (c *WatchCommand) CommandName() string { func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // create a new watcher - watcher := store.CreateWatcher() + watcher := store.NewWatcher() // add to the watchers list etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex) diff --git a/store/store.go b/store/store.go index 000f725ac..ae6a759f2 100644 --- a/store/store.go +++ b/store/store.go @@ -126,7 +126,7 @@ func CreateStore(max int) *Store { }, } - s.watcher = createWatcherHub() + s.watcher = newWatcherHub() return s } diff --git a/store/watcher.go b/store/watcher.go index 6bc353f58..dfd91774a 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -23,14 +23,14 @@ type Watcher struct { } // Create a new watcherHub -func createWatcherHub() *WatcherHub { +func newWatcherHub() *WatcherHub { w := new(WatcherHub) w.watchers = make(map[string][]*Watcher) return w } // Create a new watcher -func CreateWatcher() *Watcher { +func NewWatcher() *Watcher { return &Watcher{C: make(chan *Response, 1)} } From dc2eae8595d2bab7f8ee71ea8c1a89de8c1d69db Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 4 Aug 2013 17:42:28 -0700 Subject: [PATCH 4/7] fix watcher test and keyword test --- store/keyword_test.go | 10 ++++---- store/watcher_test.bak | 29 ---------------------- store/watcher_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 34 deletions(-) delete mode 100644 store/watcher_test.bak create mode 100644 store/watcher_test.go diff --git a/store/keyword_test.go b/store/keyword_test.go index 064d0f6de..f2d15565d 100644 --- a/store/keyword_test.go +++ b/store/keyword_test.go @@ -5,30 +5,30 @@ import ( ) func TestKeywords(t *testing.T) { - keyword := CheckKeyword("machines") + keyword := CheckKeyword("_etcd") if !keyword { t.Fatal("machines should be keyword") } - keyword = CheckKeyword("/machines") + keyword = CheckKeyword("/_etcd") if !keyword { t.Fatal("/machines should be keyword") } - keyword = CheckKeyword("/machines/") + keyword = CheckKeyword("/_etcd/") if !keyword { t.Fatal("/machines/ contains keyword prefix") } - keyword = CheckKeyword("/machines/node1") + keyword = CheckKeyword("/_etcd/node1") if !keyword { t.Fatal("/machines/* contains keyword prefix") } - keyword = CheckKeyword("/nokeyword/machines/node1") + keyword = CheckKeyword("/nokeyword/_etcd/node1") if keyword { t.Fatal("this does not contain keyword prefix") diff --git a/store/watcher_test.bak b/store/watcher_test.bak deleted file mode 100644 index ad5e80283..000000000 --- a/store/watcher_test.bak +++ /dev/null @@ -1,29 +0,0 @@ -package store - -import ( - "fmt" - "testing" - "time" -) - -func TestWatch(t *testing.T) { - // watcher := createWatcher() - c := make(chan Response) - d := make(chan Response) - w.add("/", c) - go say(c) - w.add("/prefix/", d) - go say(d) - s.Set("/prefix/foo", "bar", time.Unix(0, 0)) -} - -func say(c chan Response) { - result := <-c - - if result.Action != -1 { - fmt.Println("yes") - } else { - fmt.Println("no") - } - -} diff --git a/store/watcher_test.go b/store/watcher_test.go new file mode 100644 index 000000000..77c20e179 --- /dev/null +++ b/store/watcher_test.go @@ -0,0 +1,56 @@ +package store + +import ( + "fmt" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + + s := CreateStore(100) + + watchers := make([]*Watcher, 10) + + for i, _ := range watchers { + + // create a new watcher + watchers[i] = NewWatcher() + // add to the watchers list + s.AddWatcher("foo", watchers[i], 0) + + } + + s.Set("/foo/foo", "bar", time.Unix(0, 0), 1) + + for _, watcher := range watchers { + + // wait for the notification for any changing + res := <-watcher.C + + if res == nil { + t.Fatal("watcher is cleared") + } + } + + for i, _ := range watchers { + + // create a new watcher + watchers[i] = NewWatcher() + // add to the watchers list + s.AddWatcher("foo/foo/foo", watchers[i], 0) + + } + + s.watcher.stopWatchers() + + for _, watcher := range watchers { + + // wait for the notification for any changing + res := <-watcher.C + + if res != nil { + t.Fatal("watcher is cleared") + } + } +} From 06c9f1893f8b22238be643145b5e66bab30315bc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 4 Aug 2013 18:16:49 -0700 Subject: [PATCH 5/7] avoid set value to root --- store/tree.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/tree.go b/store/tree.go index 8ac086c97..3192d55bc 100644 --- a/store/tree.go +++ b/store/tree.go @@ -57,6 +57,11 @@ func (t *tree) set(key string, value Node) bool { nodesName := split(key) + // avoid set value to "/" + if len(nodesName) == 1 && len(nodesName[0]) == 0 { + return false + } + nodeMap := t.Root.NodeMap i := 0 From 10b5bc9871c9188cfbe57ac27b28c82d91cb4c86 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 4 Aug 2013 22:02:40 -0700 Subject: [PATCH 6/7] fix wrong snapshot path --- etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcd.go b/etcd.go index db8d8d45f..fc225b8db 100644 --- a/etcd.go +++ b/etcd.go @@ -520,7 +520,7 @@ func getInfo(path string) *Info { logPath := fmt.Sprintf("%s/log", path) confPath := fmt.Sprintf("%s/conf", path) - snapshotPath := fmt.Sprintf("%s/snapshotPath", path) + snapshotPath := fmt.Sprintf("%s/snapshot", path) os.Remove(infoPath) os.Remove(logPath) os.Remove(confPath) From 026f93cd837e70a74e020233630e7669ca890bab Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 4 Aug 2013 23:10:16 -0700 Subject: [PATCH 7/7] add watch benchmark --- store/watcher_test.go | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/store/watcher_test.go b/store/watcher_test.go index 77c20e179..389d4643f 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -1,7 +1,8 @@ package store import ( - "fmt" + "math/rand" + "strconv" "testing" "time" ) @@ -54,3 +55,41 @@ func TestWatch(t *testing.T) { } } } + +// BenchmarkWatch creates 10K watchers watch at /foo/[paht] each time. +// Path is randomly chosen with max depth 10. +// It should take less than 15ms to wake up 10K watchers. +func BenchmarkWatch(b *testing.B) { + s := CreateStore(100) + + key := make([]string, 10000) + for i := 0; i < 10000; i++ { + + key[i] = "/foo/" + depth := rand.Intn(10) + + for j := 0; j < depth; j++ { + key[i] += "/" + strconv.Itoa(rand.Int()%10) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + watchers := make([]*Watcher, 10000) + for i := 0; i < 10000; i++ { + // create a new watcher + watchers[i] = NewWatcher() + // add to the watchers list + s.AddWatcher(key[i], watchers[i], 0) + } + + s.watcher.stopWatchers() + + for _, watcher := range watchers { + // wait for the notification for any changing + <-watcher.C + } + + s.watcher = newWatcherHub() + } +}