mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #461 from xiangli-cmu/stream_watcher
feat(stream watchers) add stream watcher support
This commit is contained in:
commit
c64c739fab
@ -25,7 +25,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the watcher on the store.
|
// Start the watcher on the store.
|
||||||
watcher, err := s.Store().Watch(key, false, sinceIndex)
|
watcher, err := s.Store().Watch(key, false, false, sinceIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return etcdErr.NewError(500, key, s.Store().Index())
|
return etcdErr.NewError(500, key, s.Store().Index())
|
||||||
}
|
}
|
||||||
|
@ -9,15 +9,11 @@ import (
|
|||||||
|
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
"github.com/coreos/etcd/log"
|
"github.com/coreos/etcd/log"
|
||||||
"github.com/coreos/etcd/store"
|
|
||||||
"github.com/coreos/raft"
|
"github.com/coreos/raft"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||||
var err error
|
|
||||||
var event *store.Event
|
|
||||||
|
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
key := "/" + vars["key"]
|
key := "/" + vars["key"]
|
||||||
|
|
||||||
@ -40,22 +36,30 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
recursive := (req.FormValue("recursive") == "true")
|
recursive := (req.FormValue("recursive") == "true")
|
||||||
sorted := (req.FormValue("sorted") == "true")
|
sort := (req.FormValue("sorted") == "true")
|
||||||
|
waitIndex := req.FormValue("waitIndex")
|
||||||
|
stream := (req.FormValue("stream") == "true")
|
||||||
|
|
||||||
if req.FormValue("wait") == "true" { // watch
|
if req.FormValue("wait") == "true" {
|
||||||
|
return handleWatch(key, recursive, stream, waitIndex, w, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return handleGet(key, recursive, sort, w, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
|
||||||
// Create a command to watch from a given index (default 0).
|
// Create a command to watch from a given index (default 0).
|
||||||
var sinceIndex uint64 = 0
|
var sinceIndex uint64 = 0
|
||||||
|
var err error
|
||||||
|
|
||||||
waitIndex := req.FormValue("waitIndex")
|
|
||||||
if waitIndex != "" {
|
if waitIndex != "" {
|
||||||
sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
|
sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
|
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the watcher on the store.
|
watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
|
||||||
watcher, err := s.Store().Watch(key, recursive, sinceIndex)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -63,29 +67,60 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
|||||||
cn, _ := w.(http.CloseNotifier)
|
cn, _ := w.(http.CloseNotifier)
|
||||||
closeChan := cn.CloseNotify()
|
closeChan := cn.CloseNotify()
|
||||||
|
|
||||||
|
writeHeaders(w, s)
|
||||||
|
|
||||||
|
if stream {
|
||||||
|
// watcher hub will not help to remove stream watcher
|
||||||
|
// so we need to remove here
|
||||||
|
defer watcher.Remove()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-closeChan:
|
||||||
|
return nil
|
||||||
|
case event, ok := <-watcher.EventChan:
|
||||||
|
if !ok {
|
||||||
|
// If the channel is closed this may be an indication of
|
||||||
|
// that notifications are much more than we are able to
|
||||||
|
// send to the client in time. Then we simply end streaming.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b, _ := json.Marshal(event)
|
||||||
|
_, err := w.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
w.(http.Flusher).Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-closeChan:
|
case <-closeChan:
|
||||||
watcher.Remove()
|
watcher.Remove()
|
||||||
|
case event := <-watcher.EventChan:
|
||||||
|
b, _ := json.Marshal(event)
|
||||||
|
w.Write(b)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
case event = <-watcher.EventChan:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { //get
|
func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
|
||||||
// Retrieve the key from the store.
|
event, err := s.Store().Get(key, recursive, sort)
|
||||||
event, err = s.Store().Get(key, recursive, sorted)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeHeaders(w, s)
|
||||||
|
b, _ := json.Marshal(event)
|
||||||
|
w.Write(b)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeHeaders(w http.ResponseWriter, s Server) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
|
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
|
||||||
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
|
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
|
||||||
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
|
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
b, _ := json.Marshal(event)
|
|
||||||
|
|
||||||
w.Write(b)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ type Store interface {
|
|||||||
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
||||||
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
||||||
|
|
||||||
Watch(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error)
|
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
|
||||||
|
|
||||||
Save() ([]byte, error)
|
Save() ([]byte, error)
|
||||||
Recovery(state []byte) error
|
Recovery(state []byte) error
|
||||||
@ -340,7 +340,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
|||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, error) {
|
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
|
||||||
key = path.Clean(path.Join("/", key))
|
key = path.Clean(path.Join("/", key))
|
||||||
nextIndex := s.CurrentIndex + 1
|
nextIndex := s.CurrentIndex + 1
|
||||||
|
|
||||||
@ -351,10 +351,10 @@ func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher,
|
|||||||
var err *etcdErr.Error
|
var err *etcdErr.Error
|
||||||
|
|
||||||
if sinceIndex == 0 {
|
if sinceIndex == 0 {
|
||||||
w, err = s.WatcherHub.watch(key, recursive, nextIndex)
|
w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
w, err = s.WatcherHub.watch(key, recursive, sinceIndex)
|
w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -489,7 +489,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
|
|||||||
// Ensure that the store can watch for key creation.
|
// Ensure that the store can watch for key creation.
|
||||||
func TestStoreWatchCreate(t *testing.T) {
|
func TestStoreWatchCreate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/foo", false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
c := w.EventChan
|
c := w.EventChan
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
e := nbselect(c)
|
e := nbselect(c)
|
||||||
@ -502,7 +502,7 @@ func TestStoreWatchCreate(t *testing.T) {
|
|||||||
// Ensure that the store can watch for recursive key creation.
|
// Ensure that the store can watch for recursive key creation.
|
||||||
func TestStoreWatchRecursiveCreate(t *testing.T) {
|
func TestStoreWatchRecursiveCreate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/foo", true, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
@ -513,7 +513,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
|
|||||||
func TestStoreWatchUpdate(t *testing.T) {
|
func TestStoreWatchUpdate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.Update("/foo", "baz", Permanent)
|
s.Update("/foo", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
@ -524,7 +524,7 @@ func TestStoreWatchUpdate(t *testing.T) {
|
|||||||
func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Update("/foo/bar", "baz", Permanent)
|
s.Update("/foo/bar", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
@ -535,7 +535,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
|||||||
func TestStoreWatchDelete(t *testing.T) {
|
func TestStoreWatchDelete(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.Delete("/foo", false, false)
|
s.Delete("/foo", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
@ -546,7 +546,7 @@ func TestStoreWatchDelete(t *testing.T) {
|
|||||||
func TestStoreWatchRecursiveDelete(t *testing.T) {
|
func TestStoreWatchRecursiveDelete(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Delete("/foo/bar", false, false)
|
s.Delete("/foo/bar", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
@ -557,7 +557,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
|
|||||||
func TestStoreWatchCompareAndSwap(t *testing.T) {
|
func TestStoreWatchCompareAndSwap(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
|
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||||
@ -568,7 +568,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
|
|||||||
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
|
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||||
@ -588,7 +588,7 @@ func TestStoreWatchExpire(t *testing.T) {
|
|||||||
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
||||||
|
|
||||||
w, _ := s.Watch("/", true, 0)
|
w, _ := s.Watch("/", true, false, 0)
|
||||||
c := w.EventChan
|
c := w.EventChan
|
||||||
e := nbselect(c)
|
e := nbselect(c)
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
@ -596,12 +596,34 @@ func TestStoreWatchExpire(t *testing.T) {
|
|||||||
e = nbselect(c)
|
e = nbselect(c)
|
||||||
assert.Equal(t, e.Action, "expire", "")
|
assert.Equal(t, e.Action, "expire", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
w, _ = s.Watch("/", true, 4)
|
w, _ = s.Watch("/", true, false, 4)
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan)
|
||||||
assert.Equal(t, e.Action, "expire", "")
|
assert.Equal(t, e.Action, "expire", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the store can watch in streaming mode.
|
||||||
|
func TestStoreWatchStream(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
w, _ := s.Watch("/foo", false, true, 0)
|
||||||
|
// first modification
|
||||||
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
|
e := nbselect(w.EventChan)
|
||||||
|
assert.Equal(t, e.Action, "create", "")
|
||||||
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
|
assert.Equal(t, e.Node.Value, "bar", "")
|
||||||
|
e = nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
// second modification
|
||||||
|
s.Update("/foo", "baz", Permanent)
|
||||||
|
e = nbselect(w.EventChan)
|
||||||
|
assert.Equal(t, e.Action, "update", "")
|
||||||
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
|
assert.Equal(t, e.Node.Value, "baz", "")
|
||||||
|
e = nbselect(w.EventChan)
|
||||||
|
assert.Nil(t, e, "")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store can recover from a previously saved state.
|
// Ensure that the store can recover from a previously saved state.
|
||||||
func TestStoreRecover(t *testing.T) {
|
func TestStoreRecover(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
|
@ -18,8 +18,10 @@ package store
|
|||||||
|
|
||||||
type Watcher struct {
|
type Watcher struct {
|
||||||
EventChan chan *Event
|
EventChan chan *Event
|
||||||
|
stream bool
|
||||||
recursive bool
|
recursive bool
|
||||||
sinceIndex uint64
|
sinceIndex uint64
|
||||||
|
removed bool
|
||||||
remove func()
|
remove func()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,13 +44,23 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
|||||||
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
||||||
// should get notified even if "/foo" is not the path it is watching.
|
// should get notified even if "/foo" is not the path it is watching.
|
||||||
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
||||||
w.EventChan <- e
|
// We cannot block here if the EventChan capacity is full, otherwise
|
||||||
|
// etcd will hang. EventChan capacity is full when the rate of
|
||||||
|
// notifications are higher than our send rate.
|
||||||
|
// If this happens, we close the channel.
|
||||||
|
select {
|
||||||
|
case w.EventChan <- e:
|
||||||
|
default:
|
||||||
|
// We have missed a notification. Close the channel to indicate this situation.
|
||||||
|
close(w.EventChan)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes the watcher from watcherHub
|
// Remove removes the watcher from watcherHub
|
||||||
|
// The actual remove function is guaranteed to only be executed once
|
||||||
func (w *Watcher) Remove() {
|
func (w *Watcher) Remove() {
|
||||||
if w.remove != nil {
|
if w.remove != nil {
|
||||||
w.remove()
|
w.remove()
|
||||||
|
@ -38,7 +38,7 @@ func newWatchHub(capacity int) *watcherHub {
|
|||||||
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
|
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
|
||||||
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
||||||
// If index is zero, watch will start from the current index + 1.
|
// If index is zero, watch will start from the current index + 1.
|
||||||
func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) {
|
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*Watcher, *etcdErr.Error) {
|
||||||
event, err := wh.EventHistory.scan(key, recursive, index)
|
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -48,6 +48,7 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher,
|
|||||||
w := &Watcher{
|
w := &Watcher{
|
||||||
EventChan: make(chan *Event, 1), // use a buffered channel
|
EventChan: make(chan *Event, 1), // use a buffered channel
|
||||||
recursive: recursive,
|
recursive: recursive,
|
||||||
|
stream: stream,
|
||||||
sinceIndex: index,
|
sinceIndex: index,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,8 +74,14 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher,
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.remove = func() {
|
w.remove = func() {
|
||||||
|
if w.removed { // avoid remove it twice
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
wh.mutex.Lock()
|
wh.mutex.Lock()
|
||||||
defer wh.mutex.Unlock()
|
defer wh.mutex.Unlock()
|
||||||
|
|
||||||
|
w.removed = true
|
||||||
l.Remove(elem)
|
l.Remove(elem)
|
||||||
atomic.AddInt64(&wh.count, -1)
|
atomic.AddInt64(&wh.count, -1)
|
||||||
if l.Len() == 0 {
|
if l.Len() == 0 {
|
||||||
@ -120,12 +127,14 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
|||||||
w, _ := curr.Value.(*Watcher)
|
w, _ := curr.Value.(*Watcher)
|
||||||
|
|
||||||
if w.notify(e, e.Node.Key == path, deleted) {
|
if w.notify(e, e.Node.Key == path, deleted) {
|
||||||
|
if !w.stream { // do not remove the stream watcher
|
||||||
// if we successfully notify a watcher
|
// if we successfully notify a watcher
|
||||||
// we need to remove the watcher from the list
|
// we need to remove the watcher from the list
|
||||||
// and decrease the counter
|
// and decrease the counter
|
||||||
l.Remove(curr)
|
l.Remove(curr)
|
||||||
atomic.AddInt64(&wh.count, -1)
|
atomic.AddInt64(&wh.count, -1)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
curr = next // update current to the next element in the list
|
curr = next // update current to the next element in the list
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
func TestWatcher(t *testing.T) {
|
func TestWatcher(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
wh := s.WatcherHub
|
wh := s.WatcherHub
|
||||||
w, err := wh.watch("/foo", true, 1)
|
w, err := wh.watch("/foo", true, false, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v", err)
|
t.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) {
|
|||||||
t.Fatal("recv != send")
|
t.Fatal("recv != send")
|
||||||
}
|
}
|
||||||
|
|
||||||
w, _ = wh.watch("/foo", false, 2)
|
w, _ = wh.watch("/foo", false, false, 2)
|
||||||
c = w.EventChan
|
c = w.EventChan
|
||||||
|
|
||||||
e = newEvent(Create, "/foo/bar", 2, 2)
|
e = newEvent(Create, "/foo/bar", 2, 2)
|
||||||
@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ensure we are doing exact matching rather than prefix matching
|
// ensure we are doing exact matching rather than prefix matching
|
||||||
w, _ = wh.watch("/fo", true, 1)
|
w, _ = wh.watch("/fo", true, false, 1)
|
||||||
c = w.EventChan
|
c = w.EventChan
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user