mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
feat(stream watchers) end streaming if too many notifications
This commit is contained in:
parent
22a25a18b3
commit
c247d807af
@ -80,7 +80,14 @@ func handleWatch(key string, recursive, stream bool, waitIndex string, w http.Re
|
||||
case <-closeChan:
|
||||
chunkWriter.Close()
|
||||
return nil
|
||||
case event := <-watcher.EventChan:
|
||||
case event, ok := <-watcher.EventChan:
|
||||
if !ok {
|
||||
// If the channel is closed this may be an indication of
|
||||
// that notifications are much more than we are able to
|
||||
// send to the client in time. Then we simply end streaming.
|
||||
return nil
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(event)
|
||||
_, err := chunkWriter.Write(b)
|
||||
if err != nil {
|
||||
|
@ -44,17 +44,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
||||
// should get notified even if "/foo" is not the path it is watching.
|
||||
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
||||
// We cannot block here if the EventChan capacity is full, otherwise
|
||||
// etcd will hang. EventChan capacity is full when the rate of
|
||||
// notifications are higher than our send rate.
|
||||
// If this happens, we close the channel.
|
||||
select {
|
||||
case w.EventChan <- e:
|
||||
|
||||
// the stream watcher might be slow
|
||||
// but we cannot block here. blocking will lead the whole etcd system to hang.
|
||||
// create a go-routine to handle the blocking case
|
||||
default:
|
||||
go func() {
|
||||
// TODO add a warning here should be helpful
|
||||
w.EventChan <- e
|
||||
}()
|
||||
// We have missed a notification. Close the channel to indicate this situation.
|
||||
close(w.EventChan)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user