From a1129dd5a5c075b08f789d0851685ccc45ac121b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 2 Nov 2015 17:43:41 -0800 Subject: [PATCH] storage: support multiple watching per watcher We want to support multiple watchings per one watcher chan. Then we can have one single go routine to watch multiple keys/prefixs. --- storage/kv.go | 12 +--- storage/kv_test.go | 14 +++-- storage/watchable_store.go | 85 +++++++++++++++++++-------- storage/watchable_store_bench_test.go | 10 ++-- storage/watcher.go | 75 +++++++++++------------ storage/watcher_bench_test.go | 8 ++- 6 files changed, 122 insertions(+), 82 deletions(-) diff --git a/storage/kv.go b/storage/kv.go index 5f9013abb..44abb5d30 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -80,15 +80,9 @@ type KV interface { type WatchableKV interface { KV - // Watcher watches the events happening or happened on the given key - // or key prefix from the given startRev. - // The whole event history can be watched unless compacted. - // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. - // If `startRev` <=0, watch observes events after currentRev. - // - // Canceling the watcher releases resources associated with it, so code - // should always call cancel as soon as watch is done. - Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) + // NewWatcher returns a Watcher that can be used to + // watch events happened or happending on the KV. + NewWatcher() Watcher } // ConsistentWatchableKV is a WatchableKV that understands the consistency diff --git a/storage/kv_test.go b/storage/kv_test.go index dce4b7862..4b3f8e980 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -733,12 +733,14 @@ func TestWatchableKVWatch(t *testing.T) { s := WatchableKV(newWatchableStore(tmpPath)) defer cleanup(s, tmpPath) - wa, cancel := s.Watcher([]byte("foo"), true, 0) + w := s.NewWatcher() + + cancel := w.Watch([]byte("foo"), true, 0) defer cancel() s.Put([]byte("foo"), []byte("bar")) select { - case ev := <-wa.Event(): + case ev := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -758,7 +760,7 @@ func TestWatchableKVWatch(t *testing.T) { s.Put([]byte("foo1"), []byte("bar1")) select { - case ev := <-wa.Event(): + case ev := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -776,11 +778,11 @@ func TestWatchableKVWatch(t *testing.T) { t.Fatalf("failed to watch the event") } - wa, cancel = s.Watcher([]byte("foo1"), false, 1) + cancel = w.Watch([]byte("foo1"), false, 1) defer cancel() select { - case ev := <-wa.Event(): + case ev := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -800,7 +802,7 @@ func TestWatchableKVWatch(t *testing.T) { s.Put([]byte("foo1"), []byte("bar11")) select { - case ev := <-wa.Event(): + case ev := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 847a57ff9..5535965a5 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -22,17 +22,29 @@ import ( "github.com/coreos/etcd/storage/storagepb" ) +const ( + // chanBufLen is the length of the buffered chan + // for sending out watched events. + // TODO: find a good buf value. 1024 is just a random one that + // seems to be reasonable. + chanBufLen = 1024 +) + +type watchable interface { + watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) +} + type watchableStore struct { mu sync.Mutex *store - // contains all unsynced watchers that needs to sync events that have happened - unsynced map[*watcher]struct{} + // contains all unsynced watching that needs to sync events that have happened + unsynced map[*watching]struct{} - // contains all synced watchers that are tracking the events that will happen - // The key of the map is the key that the watcher is watching on. - synced map[string][]*watcher + // contains all synced watching that are tracking the events that will happen + // The key of the map is the key that the watching is watching on. + synced map[string][]*watching tx *ongoingTx stopc chan struct{} @@ -42,12 +54,12 @@ type watchableStore struct { func newWatchableStore(path string) *watchableStore { s := &watchableStore{ store: newStore(path), - unsynced: make(map[*watcher]struct{}), - synced: make(map[string][]*watcher), + unsynced: make(map[*watching]struct{}), + synced: make(map[string][]*watching), stopc: make(chan struct{}), } s.wg.Add(1) - go s.syncWatchersLoop() + go s.syncWatchingsLoop() return s } @@ -152,11 +164,24 @@ func (s *watchableStore) Close() error { return s.store.Close() } -func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) { +func (s *watchableStore) NewWatcher() Watcher { + return &watcher{ + watchable: s, + ch: make(chan storagepb.Event, chanBufLen), + } +} + +func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() - wa := newWatcher(key, prefix, startRev) + wa := &watching{ + key: key, + prefix: prefix, + cur: startRev, + ch: ch, + } + k := string(key) if startRev == 0 { s.synced[k] = append(s.synced[k], wa) @@ -169,9 +194,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch cancel := CancelFunc(func() { s.mu.Lock() defer s.mu.Unlock() - wa.stopWithError(ErrCanceled) - - // remove global references of the watcher + // remove global references of the watching if _, ok := s.unsynced[wa]; ok { delete(s.unsynced, wa) slowWatchersGauge.Dec() @@ -191,13 +214,13 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch return wa, cancel } -// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms. -func (s *watchableStore) syncWatchersLoop() { +// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms. +func (s *watchableStore) syncWatchingsLoop() { defer s.wg.Done() for { s.mu.Lock() - s.syncWatchers() + s.syncWatchings() s.mu.Unlock() select { @@ -208,8 +231,8 @@ func (s *watchableStore) syncWatchersLoop() { } } -// syncWatchers syncs the watchers in the unsyncd map. -func (s *watchableStore) syncWatchers() { +// syncWatchings syncs the watchings in the unsyncd map. +func (s *watchableStore) syncWatchings() { _, curRev, _ := s.store.Range(nil, nil, 0, 0) for w := range s.unsynced { var end []byte @@ -225,7 +248,7 @@ func (s *watchableStore) syncWatchers() { } evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) if err != nil { - w.stopWithError(err) + // TODO: send error event to watching delete(s.unsynced, w) continue } @@ -247,20 +270,20 @@ func (s *watchableStore) syncWatchers() { slowWatchersGauge.Set(float64(len(s.unsynced))) } -// handle handles the change of the happening event on all watchers. +// handle handles the change of the happening event on all watchings. func (s *watchableStore) handle(rev int64, ev storagepb.Event) { s.notify(rev, ev) } // notify notifies the fact that given event at the given rev just happened to -// watchers that watch on the key of the event. +// watchings that watch on the key of the event. func (s *watchableStore) notify(rev int64, ev storagepb.Event) { - // check all prefixes of the key to notify all corresponded watchers + // check all prefixes of the key to notify all corresponded watchings for i := 0; i <= len(ev.Kv.Key); i++ { ws := s.synced[string(ev.Kv.Key[:i])] nws := ws[:0] for _, w := range ws { - // the watcher needs to be notified when either it watches prefix or + // the watching needs to be notified when either it watches prefix or // the key is exactly matched. if !w.prefix && i != len(ev.Kv.Key) { continue @@ -301,3 +324,19 @@ func (tx *ongoingTx) del(k string) { tx.delm[k] = true tx.putm[k] = false } + +type watching struct { + // the watching key + key []byte + // prefix indicates if watching is on a key or a prefix. + // If prefix is true, the watching is on a prefix. + prefix bool + // cur is the current watching revision. + // If cur is behind the current revision of the KV, + // watching is unsynced and needs to catch up. + cur int64 + + // a chan to send out the watched events. + // The chan might be shared with other watchings. + ch chan<- storagepb.Event +} diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 4c84b6097..437022f9a 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -37,14 +37,14 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // in unsynced for this benchmark. s := &watchableStore{ store: newStore(tmpPath), - unsynced: make(map[*watcher]struct{}), + unsynced: make(map[*watching]struct{}), // For previous implementation, use: - // unsynced: make([]*watcher, 0), + // unsynced: make([]*watching, 0), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string][]*watcher), + synced: make(map[string][]*watching), } defer func() { @@ -60,10 +60,12 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { testValue := []byte("bar") s.Put(testKey, testValue) + w := s.NewWatcher() + cancels := make([]CancelFunc, watcherSize) for i := 0; i < watcherSize; i++ { // non-0 value to keep watchers in unsynced - _, cancel := s.Watcher(testKey, true, 1) + cancel := w.Watch(testKey, true, 1) cancels[i] = cancel } diff --git a/storage/watcher.go b/storage/watcher.go index e80ab1c0c..d85fb0416 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -20,55 +20,56 @@ import ( "github.com/coreos/etcd/storage/storagepb" ) -// Watcher watches on the KV. It will be notified if there is an event -// happened on the watched key or prefix. type Watcher interface { - // Event returns a channel that receives observed event that matches the - // context of watcher. When watch finishes or is canceled or aborted, the - // channel is closed and returns empty event. - // Successive calls to Event return the same value. - Event() <-chan storagepb.Event + // Watch watches the events happening or happened on the given key + // or key prefix from the given startRev. + // The whole event history can be watched unless compacted. + // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. + // If `startRev` <=0, watch observes events after currentRev. + Watch(key []byte, prefix bool, startRev int64) CancelFunc - // Err returns a non-nil error value after Event is closed. Err returns - // Compacted if the history was compacted, Canceled if watch is canceled, - // or EOF if watch reaches the end revision. No other values for Err are defined. - // After Event is closed, successive calls to Err return the same value. - Err() error + // Chan returns a chan. All watched events will be sent to the returned chan. + Chan() <-chan storagepb.Event + + // Close closes the WatchChan and release all related resources. + Close() } +// watcher contains a collection of watching that share +// one chan to send out watched events and other control events. type watcher struct { - key []byte - prefix bool - cur int64 + watchable watchable + ch chan storagepb.Event - ch chan storagepb.Event - mu sync.Mutex - err error + mu sync.Mutex // guards fields below it + closed bool + cancels []CancelFunc } -func newWatcher(key []byte, prefix bool, start int64) *watcher { - return &watcher{ - key: key, - prefix: prefix, - cur: start, - ch: make(chan storagepb.Event, 10), +// TODO: return error if ws is closed? +func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc { + _, c := ws.watchable.watch(key, prefix, startRev, ws.ch) + ws.mu.Lock() + defer ws.mu.Unlock() + if ws.closed { + return nil } + // TODO: cancelFunc needs to be removed from the cancels when it is called. + ws.cancels = append(ws.cancels, c) + return c } -func (w *watcher) Event() <-chan storagepb.Event { return w.ch } - -func (w *watcher) Err() error { - w.mu.Lock() - defer w.mu.Unlock() - return w.err +func (ws *watcher) Chan() <-chan storagepb.Event { + return ws.ch } -func (w *watcher) stopWithError(err error) { - if w.err != nil { - return +func (ws *watcher) Close() { + ws.mu.Lock() + defer ws.mu.Unlock() + + for _, cancel := range ws.cancels { + cancel() } - close(w.ch) - w.mu.Lock() - w.err = err - w.mu.Unlock() + ws.closed = true + close(ws.ch) } diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go index fd556a49a..0610c90f0 100644 --- a/storage/watcher_bench_test.go +++ b/storage/watcher_bench_test.go @@ -20,12 +20,14 @@ import ( ) func BenchmarkKVWatcherMemoryUsage(b *testing.B) { - s := newWatchableStore(tmpPath) - defer cleanup(s, tmpPath) + watchable := newWatchableStore(tmpPath) + defer cleanup(watchable, tmpPath) + + w := watchable.NewWatcher() b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0) + w.Watch([]byte(fmt.Sprint("foo", i)), false, 0) } }