storage: support multiple watching per watcher

We want to support multiple watchings per one watcher chan. Then
we can have one single go routine to watch multiple keys/prefixs.
This commit is contained in:
Xiang Li 2015-11-02 17:43:41 -08:00
parent 4fd65ecd4c
commit a1129dd5a5
6 changed files with 122 additions and 82 deletions

View File

@ -80,15 +80,9 @@ type KV interface {
type WatchableKV interface {
KV
// Watcher watches the events happening or happened on the given key
// or key prefix from the given startRev.
// The whole event history can be watched unless compacted.
// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
// If `startRev` <=0, watch observes events after currentRev.
//
// Canceling the watcher releases resources associated with it, so code
// should always call cancel as soon as watch is done.
Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc)
// NewWatcher returns a Watcher that can be used to
// watch events happened or happending on the KV.
NewWatcher() Watcher
}
// ConsistentWatchableKV is a WatchableKV that understands the consistency

View File

@ -733,12 +733,14 @@ func TestWatchableKVWatch(t *testing.T) {
s := WatchableKV(newWatchableStore(tmpPath))
defer cleanup(s, tmpPath)
wa, cancel := s.Watcher([]byte("foo"), true, 0)
w := s.NewWatcher()
cancel := w.Watch([]byte("foo"), true, 0)
defer cancel()
s.Put([]byte("foo"), []byte("bar"))
select {
case ev := <-wa.Event():
case ev := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -758,7 +760,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar1"))
select {
case ev := <-wa.Event():
case ev := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -776,11 +778,11 @@ func TestWatchableKVWatch(t *testing.T) {
t.Fatalf("failed to watch the event")
}
wa, cancel = s.Watcher([]byte("foo1"), false, 1)
cancel = w.Watch([]byte("foo1"), false, 1)
defer cancel()
select {
case ev := <-wa.Event():
case ev := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -800,7 +802,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar11"))
select {
case ev := <-wa.Event():
case ev := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{

View File

@ -22,17 +22,29 @@ import (
"github.com/coreos/etcd/storage/storagepb"
)
const (
// chanBufLen is the length of the buffered chan
// for sending out watched events.
// TODO: find a good buf value. 1024 is just a random one that
// seems to be reasonable.
chanBufLen = 1024
)
type watchable interface {
watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
}
type watchableStore struct {
mu sync.Mutex
*store
// contains all unsynced watchers that needs to sync events that have happened
unsynced map[*watcher]struct{}
// contains all unsynced watching that needs to sync events that have happened
unsynced map[*watching]struct{}
// contains all synced watchers that are tracking the events that will happen
// The key of the map is the key that the watcher is watching on.
synced map[string][]*watcher
// contains all synced watching that are tracking the events that will happen
// The key of the map is the key that the watching is watching on.
synced map[string][]*watching
tx *ongoingTx
stopc chan struct{}
@ -42,12 +54,12 @@ type watchableStore struct {
func newWatchableStore(path string) *watchableStore {
s := &watchableStore{
store: newStore(path),
unsynced: make(map[*watcher]struct{}),
synced: make(map[string][]*watcher),
unsynced: make(map[*watching]struct{}),
synced: make(map[string][]*watching),
stopc: make(chan struct{}),
}
s.wg.Add(1)
go s.syncWatchersLoop()
go s.syncWatchingsLoop()
return s
}
@ -152,11 +164,24 @@ func (s *watchableStore) Close() error {
return s.store.Close()
}
func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) {
func (s *watchableStore) NewWatcher() Watcher {
return &watcher{
watchable: s,
ch: make(chan storagepb.Event, chanBufLen),
}
}
func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
wa := newWatcher(key, prefix, startRev)
wa := &watching{
key: key,
prefix: prefix,
cur: startRev,
ch: ch,
}
k := string(key)
if startRev == 0 {
s.synced[k] = append(s.synced[k], wa)
@ -169,9 +194,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
cancel := CancelFunc(func() {
s.mu.Lock()
defer s.mu.Unlock()
wa.stopWithError(ErrCanceled)
// remove global references of the watcher
// remove global references of the watching
if _, ok := s.unsynced[wa]; ok {
delete(s.unsynced, wa)
slowWatchersGauge.Dec()
@ -191,13 +214,13 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
return wa, cancel
}
// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms.
func (s *watchableStore) syncWatchingsLoop() {
defer s.wg.Done()
for {
s.mu.Lock()
s.syncWatchers()
s.syncWatchings()
s.mu.Unlock()
select {
@ -208,8 +231,8 @@ func (s *watchableStore) syncWatchersLoop() {
}
}
// syncWatchers syncs the watchers in the unsyncd map.
func (s *watchableStore) syncWatchers() {
// syncWatchings syncs the watchings in the unsyncd map.
func (s *watchableStore) syncWatchings() {
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
for w := range s.unsynced {
var end []byte
@ -225,7 +248,7 @@ func (s *watchableStore) syncWatchers() {
}
evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
if err != nil {
w.stopWithError(err)
// TODO: send error event to watching
delete(s.unsynced, w)
continue
}
@ -247,20 +270,20 @@ func (s *watchableStore) syncWatchers() {
slowWatchersGauge.Set(float64(len(s.unsynced)))
}
// handle handles the change of the happening event on all watchers.
// handle handles the change of the happening event on all watchings.
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
s.notify(rev, ev)
}
// notify notifies the fact that given event at the given rev just happened to
// watchers that watch on the key of the event.
// watchings that watch on the key of the event.
func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
// check all prefixes of the key to notify all corresponded watchers
// check all prefixes of the key to notify all corresponded watchings
for i := 0; i <= len(ev.Kv.Key); i++ {
ws := s.synced[string(ev.Kv.Key[:i])]
nws := ws[:0]
for _, w := range ws {
// the watcher needs to be notified when either it watches prefix or
// the watching needs to be notified when either it watches prefix or
// the key is exactly matched.
if !w.prefix && i != len(ev.Kv.Key) {
continue
@ -301,3 +324,19 @@ func (tx *ongoingTx) del(k string) {
tx.delm[k] = true
tx.putm[k] = false
}
type watching struct {
// the watching key
key []byte
// prefix indicates if watching is on a key or a prefix.
// If prefix is true, the watching is on a prefix.
prefix bool
// cur is the current watching revision.
// If cur is behind the current revision of the KV,
// watching is unsynced and needs to catch up.
cur int64
// a chan to send out the watched events.
// The chan might be shared with other watchings.
ch chan<- storagepb.Event
}

View File

@ -37,14 +37,14 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
// in unsynced for this benchmark.
s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watcher]struct{}),
unsynced: make(map[*watching]struct{}),
// For previous implementation, use:
// unsynced: make([]*watcher, 0),
// unsynced: make([]*watching, 0),
// to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test.
synced: make(map[string][]*watcher),
synced: make(map[string][]*watching),
}
defer func() {
@ -60,10 +60,12 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
testValue := []byte("bar")
s.Put(testKey, testValue)
w := s.NewWatcher()
cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
// non-0 value to keep watchers in unsynced
_, cancel := s.Watcher(testKey, true, 1)
cancel := w.Watch(testKey, true, 1)
cancels[i] = cancel
}

View File

@ -20,55 +20,56 @@ import (
"github.com/coreos/etcd/storage/storagepb"
)
// Watcher watches on the KV. It will be notified if there is an event
// happened on the watched key or prefix.
type Watcher interface {
// Event returns a channel that receives observed event that matches the
// context of watcher. When watch finishes or is canceled or aborted, the
// channel is closed and returns empty event.
// Successive calls to Event return the same value.
Event() <-chan storagepb.Event
// Watch watches the events happening or happened on the given key
// or key prefix from the given startRev.
// The whole event history can be watched unless compacted.
// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
// If `startRev` <=0, watch observes events after currentRev.
Watch(key []byte, prefix bool, startRev int64) CancelFunc
// Err returns a non-nil error value after Event is closed. Err returns
// Compacted if the history was compacted, Canceled if watch is canceled,
// or EOF if watch reaches the end revision. No other values for Err are defined.
// After Event is closed, successive calls to Err return the same value.
Err() error
// Chan returns a chan. All watched events will be sent to the returned chan.
Chan() <-chan storagepb.Event
// Close closes the WatchChan and release all related resources.
Close()
}
// watcher contains a collection of watching that share
// one chan to send out watched events and other control events.
type watcher struct {
key []byte
prefix bool
cur int64
watchable watchable
ch chan storagepb.Event
ch chan storagepb.Event
mu sync.Mutex
err error
mu sync.Mutex // guards fields below it
closed bool
cancels []CancelFunc
}
func newWatcher(key []byte, prefix bool, start int64) *watcher {
return &watcher{
key: key,
prefix: prefix,
cur: start,
ch: make(chan storagepb.Event, 10),
// TODO: return error if ws is closed?
func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc {
_, c := ws.watchable.watch(key, prefix, startRev, ws.ch)
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return nil
}
// TODO: cancelFunc needs to be removed from the cancels when it is called.
ws.cancels = append(ws.cancels, c)
return c
}
func (w *watcher) Event() <-chan storagepb.Event { return w.ch }
func (w *watcher) Err() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.err
func (ws *watcher) Chan() <-chan storagepb.Event {
return ws.ch
}
func (w *watcher) stopWithError(err error) {
if w.err != nil {
return
func (ws *watcher) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()
for _, cancel := range ws.cancels {
cancel()
}
close(w.ch)
w.mu.Lock()
w.err = err
w.mu.Unlock()
ws.closed = true
close(ws.ch)
}

View File

@ -20,12 +20,14 @@ import (
)
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
s := newWatchableStore(tmpPath)
defer cleanup(s, tmpPath)
watchable := newWatchableStore(tmpPath)
defer cleanup(watchable, tmpPath)
w := watchable.NewWatcher()
b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0)
w.Watch([]byte(fmt.Sprint("foo", i)), false, 0)
}
}