diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index f2a793b43..270193d77 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -80,7 +80,14 @@ func handleWatch(key string, recursive, stream bool, waitIndex string, w http.Re case <-closeChan: chunkWriter.Close() return nil - case event := <-watcher.EventChan: + case event, ok := <-watcher.EventChan: + if !ok { + // If the channel is closed this may be an indication of + // that notifications are much more than we are able to + // send to the client in time. Then we simply end streaming. + return nil + } + b, _ := json.Marshal(event) _, err := chunkWriter.Write(b) if err != nil { diff --git a/store/watcher.go b/store/watcher.go index 781397e2a..7a11656c6 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -44,17 +44,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // should get notified even if "/foo" is not the path it is watching. if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex { + // We cannot block here if the EventChan capacity is full, otherwise + // etcd will hang. EventChan capacity is full when the rate of + // notifications are higher than our send rate. + // If this happens, we close the channel. select { case w.EventChan <- e: - - // the stream watcher might be slow - // but we cannot block here. blocking will lead the whole etcd system to hang. - // create a go-routine to handle the blocking case default: - go func() { - // TODO add a warning here should be helpful - w.EventChan <- e - }() + // We have missed a notification. Close the channel to indicate this situation. + close(w.EventChan) } return true }