diff --git a/server/v1/watch_key_handler.go b/server/v1/watch_key_handler.go index fe5b768d1..75abb0724 100644 --- a/server/v1/watch_key_handler.go +++ b/server/v1/watch_key_handler.go @@ -25,7 +25,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } // Start the watcher on the store. - watcher, err := s.Store().Watch(key, false, sinceIndex) + watcher, err := s.Store().Watch(key, false, false, sinceIndex) if err != nil { return etcdErr.NewError(500, key, s.Store().Index()) } diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index c32b80137..23549e47d 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -9,15 +9,11 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" "github.com/coreos/raft" "github.com/gorilla/mux" ) func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { - var err error - var event *store.Event - vars := mux.Vars(req) key := "/" + vars["key"] @@ -40,52 +36,91 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { } recursive := (req.FormValue("recursive") == "true") - sorted := (req.FormValue("sorted") == "true") + sort := (req.FormValue("sorted") == "true") + waitIndex := req.FormValue("waitIndex") + stream := (req.FormValue("stream") == "true") - if req.FormValue("wait") == "true" { // watch - // Create a command to watch from a given index (default 0). - var sinceIndex uint64 = 0 + if req.FormValue("wait") == "true" { + return handleWatch(key, recursive, stream, waitIndex, w, s) + } - waitIndex := req.FormValue("waitIndex") - if waitIndex != "" { - sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index()) - } - } + return handleGet(key, recursive, sort, w, s) +} - // Start the watcher on the store. - watcher, err := s.Store().Watch(key, recursive, sinceIndex) +func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error { + // Create a command to watch from a given index (default 0). + var sinceIndex uint64 = 0 + var err error + + if waitIndex != "" { + sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64) if err != nil { - return err - } - - cn, _ := w.(http.CloseNotifier) - closeChan := cn.CloseNotify() - - select { - case <-closeChan: - watcher.Remove() - return nil - case event = <-watcher.EventChan: - } - - } else { //get - // Retrieve the key from the store. - event, err = s.Store().Get(key, recursive, sorted) - if err != nil { - return err + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index()) } } + watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex) + if err != nil { + return err + } + + cn, _ := w.(http.CloseNotifier) + closeChan := cn.CloseNotify() + + writeHeaders(w, s) + + if stream { + // watcher hub will not help to remove stream watcher + // so we need to remove here + defer watcher.Remove() + for { + select { + case <-closeChan: + return nil + case event, ok := <-watcher.EventChan: + if !ok { + // If the channel is closed this may be an indication of + // that notifications are much more than we are able to + // send to the client in time. Then we simply end streaming. + return nil + } + + b, _ := json.Marshal(event) + _, err := w.Write(b) + if err != nil { + return nil + } + w.(http.Flusher).Flush() + } + } + } + + select { + case <-closeChan: + watcher.Remove() + case event := <-watcher.EventChan: + b, _ := json.Marshal(event) + w.Write(b) + } + return nil +} + +func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error { + event, err := s.Store().Get(key, recursive, sort) + if err != nil { + return err + } + + writeHeaders(w, s) + b, _ := json.Marshal(event) + w.Write(b) + return nil +} + +func writeHeaders(w http.ResponseWriter, s Server) { w.Header().Set("Content-Type", "application/json") w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index())) w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) w.WriteHeader(http.StatusOK) - b, _ := json.Marshal(event) - - w.Write(b) - - return nil } diff --git a/store/store.go b/store/store.go index 577a4c31d..8593c5178 100644 --- a/store/store.go +++ b/store/store.go @@ -53,7 +53,7 @@ type Store interface { Delete(nodePath string, recursive, dir bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) - Watch(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error) + Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) Save() ([]byte, error) Recovery(state []byte) error @@ -340,7 +340,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui return e, nil } -func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, error) { +func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) { key = path.Clean(path.Join("/", key)) nextIndex := s.CurrentIndex + 1 @@ -351,10 +351,10 @@ func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, var err *etcdErr.Error if sinceIndex == 0 { - w, err = s.WatcherHub.watch(key, recursive, nextIndex) + w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex) } else { - w, err = s.WatcherHub.watch(key, recursive, sinceIndex) + w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex) } if err != nil { diff --git a/store/store_test.go b/store/store_test.go index f469aa18f..5eae1b46d 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -489,7 +489,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { // Ensure that the store can watch for key creation. func TestStoreWatchCreate(t *testing.T) { s := newStore() - w, _ := s.Watch("/foo", false, 0) + w, _ := s.Watch("/foo", false, false, 0) c := w.EventChan s.Create("/foo", false, "bar", false, Permanent) e := nbselect(c) @@ -502,7 +502,7 @@ func TestStoreWatchCreate(t *testing.T) { // Ensure that the store can watch for recursive key creation. func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() - w, _ := s.Watch("/foo", true, 0) + w, _ := s.Watch("/foo", true, false, 0) s.Create("/foo/bar", false, "baz", false, Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "create", "") @@ -513,7 +513,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.Watch("/foo", false, 0) + w, _ := s.Watch("/foo", false, false, 0) s.Update("/foo", "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") @@ -524,7 +524,7 @@ func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.Watch("/foo", true, 0) + w, _ := s.Watch("/foo", true, false, 0) s.Update("/foo/bar", "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "update", "") @@ -535,7 +535,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchDelete(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.Watch("/foo", false, 0) + w, _ := s.Watch("/foo", false, false, 0) s.Delete("/foo", false, false) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") @@ -546,7 +546,7 @@ func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.Watch("/foo", true, 0) + w, _ := s.Watch("/foo", true, false, 0) s.Delete("/foo/bar", false, false) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "delete", "") @@ -557,7 +557,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo", false, "bar", false, Permanent) - w, _ := s.Watch("/foo", false, 0) + w, _ := s.Watch("/foo", false, false, 0) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") @@ -568,7 +568,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s := newStore() s.Create("/foo/bar", false, "baz", false, Permanent) - w, _ := s.Watch("/foo", true, 0) + w, _ := s.Watch("/foo", true, false, 0) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) e := nbselect(w.EventChan) assert.Equal(t, e.Action, "compareAndSwap", "") @@ -588,7 +588,7 @@ func TestStoreWatchExpire(t *testing.T) { s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) - w, _ := s.Watch("/", true, 0) + w, _ := s.Watch("/", true, false, 0) c := w.EventChan e := nbselect(c) assert.Nil(t, e, "") @@ -596,12 +596,34 @@ func TestStoreWatchExpire(t *testing.T) { e = nbselect(c) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foo", "") - w, _ = s.Watch("/", true, 4) + w, _ = s.Watch("/", true, false, 4) e = nbselect(w.EventChan) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foofoo", "") } +// Ensure that the store can watch in streaming mode. +func TestStoreWatchStream(t *testing.T) { + s := newStore() + w, _ := s.Watch("/foo", false, true, 0) + // first modification + s.Create("/foo", false, "bar", false, Permanent) + e := nbselect(w.EventChan) + assert.Equal(t, e.Action, "create", "") + assert.Equal(t, e.Node.Key, "/foo", "") + assert.Equal(t, e.Node.Value, "bar", "") + e = nbselect(w.EventChan) + assert.Nil(t, e, "") + // second modification + s.Update("/foo", "baz", Permanent) + e = nbselect(w.EventChan) + assert.Equal(t, e.Action, "update", "") + assert.Equal(t, e.Node.Key, "/foo", "") + assert.Equal(t, e.Node.Value, "baz", "") + e = nbselect(w.EventChan) + assert.Nil(t, e, "") +} + // Ensure that the store can recover from a previously saved state. func TestStoreRecover(t *testing.T) { s := newStore() diff --git a/store/watcher.go b/store/watcher.go index 6576d8baf..7a11656c6 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -18,8 +18,10 @@ package store type Watcher struct { EventChan chan *Event + stream bool recursive bool sinceIndex uint64 + removed bool remove func() } @@ -42,13 +44,23 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // should get notified even if "/foo" is not the path it is watching. if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex { - w.EventChan <- e + // We cannot block here if the EventChan capacity is full, otherwise + // etcd will hang. EventChan capacity is full when the rate of + // notifications are higher than our send rate. + // If this happens, we close the channel. + select { + case w.EventChan <- e: + default: + // We have missed a notification. Close the channel to indicate this situation. + close(w.EventChan) + } return true } return false } // Remove removes the watcher from watcherHub +// The actual remove function is guaranteed to only be executed once func (w *Watcher) Remove() { if w.remove != nil { w.remove() diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 9b7aaba3b..aeda171a9 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -38,7 +38,7 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is true, the first change after index under key will be sent to the event channel of the watcher. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { +func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { @@ -48,6 +48,7 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, w := &Watcher{ EventChan: make(chan *Event, 1), // use a buffered channel recursive: recursive, + stream: stream, sinceIndex: index, } @@ -73,8 +74,14 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, } w.remove = func() { + if w.removed { // avoid remove it twice + return + } + wh.mutex.Lock() defer wh.mutex.Unlock() + + w.removed = true l.Remove(elem) atomic.AddInt64(&wh.count, -1) if l.Len() == 0 { @@ -120,11 +127,13 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { w, _ := curr.Value.(*Watcher) if w.notify(e, e.Node.Key == path, deleted) { - // if we successfully notify a watcher - // we need to remove the watcher from the list - // and decrease the counter - l.Remove(curr) - atomic.AddInt64(&wh.count, -1) + if !w.stream { // do not remove the stream watcher + // if we successfully notify a watcher + // we need to remove the watcher from the list + // and decrease the counter + l.Remove(curr) + atomic.AddInt64(&wh.count, -1) + } } curr = next // update current to the next element in the list diff --git a/store/watcher_test.go b/store/watcher_test.go index 2600fa161..c2cd154ea 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,7 +23,7 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - w, err := wh.watch("/foo", true, 1) + w, err := wh.watch("/foo", true, false, 1) if err != nil { t.Fatalf("%v", err) } @@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - w, _ = wh.watch("/foo", false, 2) + w, _ = wh.watch("/foo", false, false, 2) c = w.EventChan e = newEvent(Create, "/foo/bar", 2, 2) @@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - w, _ = wh.watch("/fo", true, 1) + w, _ = wh.watch("/fo", true, false, 1) c = w.EventChan select {