From c0eac7ab72a14a28f3f7a4e07393e28ee885af6c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 26 Feb 2016 08:55:28 -0800 Subject: [PATCH] storage: support watch on ranges --- etcdserver/api/v3rpc/watch.go | 33 +--- storage/kv_test.go | 129 ++++++------ storage/watchable_store.go | 250 ++++-------------------- storage/watchable_store_bench_test.go | 8 +- storage/watchable_store_test.go | 73 ++++--- storage/watcher.go | 9 +- storage/watcher_bench_test.go | 2 +- storage/watcher_group.go | 269 ++++++++++++++++++++++++++ storage/watcher_test.go | 15 +- 9 files changed, 421 insertions(+), 367 deletions(-) create mode 100644 storage/watcher_group.go diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 1660732c9..1579815e8 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -16,7 +16,6 @@ package v3rpc import ( "io" - "reflect" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -100,9 +99,10 @@ func (sws *serverWatchStream) recvLoop() error { } creq := uv.CreateRequest - toWatch := creq.Key - isPrefix := len(creq.RangeEnd) != 0 - badPrefix := isPrefix && !reflect.DeepEqual(getPrefix(toWatch), creq.RangeEnd) + if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 { + // support >= key queries + creq.RangeEnd = []byte{} + } rev := creq.StartRevision wsrev := sws.watchStream.Rev() @@ -112,16 +112,15 @@ func (sws *serverWatchStream) recvLoop() error { rev = wsrev + 1 } // do not allow future watch revision - // do not allow range that is not a prefix id := storage.WatchID(-1) - if !futureRev && !badPrefix { - id = sws.watchStream.Watch(toWatch, isPrefix, rev) + if !futureRev { + id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) } sws.ctrlStream <- &pb.WatchResponse{ Header: sws.newResponseHeader(wsrev), WatchId: int64(id), Created: true, - Canceled: futureRev || badPrefix, + Canceled: futureRev, } case *pb.WatchRequest_CancelRequest: if uv.CancelRequest != nil { @@ -237,21 +236,3 @@ func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { RaftTerm: sws.raftTimer.Term(), } } - -// TODO: remove getPrefix when storage supports full range watchers - -func getPrefix(key []byte) []byte { - end := make([]byte, len(key)) - copy(end, key) - for i := len(end) - 1; i >= 0; i-- { - if end[i] < 0xff { - end[i] = end[i] + 1 - end = end[:i+1] - return end - } - } - // next prefix does not exist (e.g., 0xffff); - // default to WithFromKey policy - end = []byte{0} - return end -} diff --git a/storage/kv_test.go b/storage/kv_test.go index cf0f9183b..0ed0cf8b2 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -722,13 +722,10 @@ func TestWatchableKVWatch(t *testing.T) { w := s.NewWatchStream() defer w.Close() - wid := w.Watch([]byte("foo"), true, 0) + wid := w.Watch([]byte("foo"), []byte("fop"), 0) - s.Put([]byte("foo"), []byte("bar"), 1) - select { - case resp := <-w.Chan(): - wev := storagepb.Event{ - Type: storagepb.PUT, + wev := []storagepb.Event{ + {Type: storagepb.PUT, Kv: &storagepb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -737,23 +734,8 @@ func TestWatchableKVWatch(t *testing.T) { Version: 1, Lease: 1, }, - } - if resp.WatchID != wid { - t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) - } - ev := resp.Events[0] - if !reflect.DeepEqual(ev, wev) { - t.Errorf("watched event = %+v, want %+v", ev, wev) - } - case <-time.After(5 * time.Second): - // CPU might be too slow, and the routine is not able to switch around - testutil.FatalStack(t, "failed to watch the event") - } - - s.Put([]byte("foo1"), []byte("bar1"), 2) - select { - case resp := <-w.Chan(): - wev := storagepb.Event{ + }, + { Type: storagepb.PUT, Kv: &storagepb.KeyValue{ Key: []byte("foo1"), @@ -763,49 +745,8 @@ func TestWatchableKVWatch(t *testing.T) { Version: 1, Lease: 2, }, - } - if resp.WatchID != wid { - t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) - } - ev := resp.Events[0] - if !reflect.DeepEqual(ev, wev) { - t.Errorf("watched event = %+v, want %+v", ev, wev) - } - case <-time.After(5 * time.Second): - testutil.FatalStack(t, "failed to watch the event") - } - - w = s.NewWatchStream() - wid = w.Watch([]byte("foo1"), false, 1) - - select { - case resp := <-w.Chan(): - wev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo1"), - Value: []byte("bar1"), - CreateRevision: 3, - ModRevision: 3, - Version: 1, - Lease: 2, - }, - } - if resp.WatchID != wid { - t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) - } - ev := resp.Events[0] - if !reflect.DeepEqual(ev, wev) { - t.Errorf("watched event = %+v, want %+v", ev, wev) - } - case <-time.After(5 * time.Second): - testutil.FatalStack(t, "failed to watch the event") - } - - s.Put([]byte("foo1"), []byte("bar11"), 3) - select { - case resp := <-w.Chan(): - wev := storagepb.Event{ + }, + { Type: storagepb.PUT, Kv: &storagepb.KeyValue{ Key: []byte("foo1"), @@ -815,13 +756,63 @@ func TestWatchableKVWatch(t *testing.T) { Version: 2, Lease: 3, }, - } + }, + } + + s.Put([]byte("foo"), []byte("bar"), 1) + select { + case resp := <-w.Chan(): if resp.WatchID != wid { t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) } ev := resp.Events[0] - if !reflect.DeepEqual(ev, wev) { - t.Errorf("watched event = %+v, want %+v", ev, wev) + if !reflect.DeepEqual(ev, wev[0]) { + t.Errorf("watched event = %+v, want %+v", ev, wev[0]) + } + case <-time.After(5 * time.Second): + // CPU might be too slow, and the routine is not able to switch around + testutil.FatalStack(t, "failed to watch the event") + } + + s.Put([]byte("foo1"), []byte("bar1"), 2) + select { + case resp := <-w.Chan(): + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] + if !reflect.DeepEqual(ev, wev[1]) { + t.Errorf("watched event = %+v, want %+v", ev, wev[1]) + } + case <-time.After(5 * time.Second): + testutil.FatalStack(t, "failed to watch the event") + } + + w = s.NewWatchStream() + wid = w.Watch([]byte("foo1"), []byte("foo2"), 3) + + select { + case resp := <-w.Chan(): + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] + if !reflect.DeepEqual(ev, wev[1]) { + t.Errorf("watched event = %+v, want %+v", ev, wev[1]) + } + case <-time.After(5 * time.Second): + testutil.FatalStack(t, "failed to watch the event") + } + + s.Put([]byte("foo1"), []byte("bar11"), 3) + select { + case resp := <-w.Chan(): + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] + if !reflect.DeepEqual(ev, wev[2]) { + t.Errorf("watched event = %+v, want %+v", ev, wev[2]) } case <-time.After(5 * time.Second): testutil.FatalStack(t, "failed to watch the event") diff --git a/storage/watchable_store.go b/storage/watchable_store.go index d6e3d6daf..e9d3b8b8e 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -16,8 +16,6 @@ package storage import ( "log" - "math" - "strings" "sync" "time" @@ -34,103 +32,8 @@ const ( chanBufLen = 1024 ) -var ( - // watchBatchMaxRevs is the maximum distinct revisions that - // may be sent to an unsynced watcher at a time. Declared as - // var instead of const for testing purposes. - watchBatchMaxRevs = 1000 -) - -type eventBatch struct { - // evs is a batch of revision-ordered events - evs []storagepb.Event - // revs is the minimum unique revisions observed for this batch - revs int - // moreRev is first revision with more events following this batch - moreRev int64 -} - -type ( - watcherSetByKey map[string]watcherSet - watcherSet map[*watcher]struct{} - watcherBatch map[*watcher]*eventBatch -) - -func (eb *eventBatch) add(ev storagepb.Event) { - if eb.revs > watchBatchMaxRevs { - // maxed out batch size - return - } - - if len(eb.evs) == 0 { - // base case - eb.revs = 1 - eb.evs = append(eb.evs, ev) - return - } - - // revision accounting - ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision - evRev := ev.Kv.ModRevision - if evRev > ebRev { - eb.revs++ - if eb.revs > watchBatchMaxRevs { - eb.moreRev = evRev - return - } - } - - eb.evs = append(eb.evs, ev) -} - -func (wb watcherBatch) add(w *watcher, ev storagepb.Event) { - eb := wb[w] - if eb == nil { - eb = &eventBatch{} - wb[w] = eb - } - eb.add(ev) -} - -func (w watcherSet) add(wa *watcher) { - if _, ok := w[wa]; ok { - panic("add watcher twice!") - } - w[wa] = struct{}{} -} - -func (w watcherSetByKey) add(wa *watcher) { - set := w[string(wa.key)] - if set == nil { - set = make(watcherSet) - w[string(wa.key)] = set - } - set.add(wa) -} - -func (w watcherSetByKey) getSetByKey(key string) (watcherSet, bool) { - set, ok := w[key] - return set, ok -} - -func (w watcherSetByKey) delete(wa *watcher) bool { - k := string(wa.key) - if v, ok := w[k]; ok { - if _, ok := v[wa]; ok { - delete(v, wa) - // if there is nothing in the set, - // remove the set - if len(v) == 0 { - delete(w, k) - } - return true - } - } - return false -} - type watchable interface { - watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) + watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) rev() int64 } @@ -140,11 +43,11 @@ type watchableStore struct { *store // contains all unsynced watchers that needs to sync with events that have happened - unsynced watcherSetByKey + unsynced watcherGroup // contains all synced watchers that are in sync with the progress of the store. // The key of the map is the key that the watcher watches on. - synced watcherSetByKey + synced watcherGroup stopc chan struct{} wg sync.WaitGroup @@ -157,8 +60,8 @@ type cancelFunc func() func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { s := &watchableStore{ store: NewStore(b, le), - unsynced: make(watcherSetByKey), - synced: make(watcherSetByKey), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), stopc: make(chan struct{}), } if s.le != nil { @@ -268,16 +171,16 @@ func (s *watchableStore) NewWatchStream() WatchStream { } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) { +func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) { s.mu.Lock() defer s.mu.Unlock() wa := &watcher{ - key: key, - prefix: prefix, - cur: startRev, - id: id, - ch: ch, + key: key, + end: end, + cur: startRev, + id: id, + ch: ch, } s.store.mu.Lock() @@ -342,15 +245,16 @@ func (s *watchableStore) syncWatchers() { s.store.mu.Lock() defer s.store.mu.Unlock() - if len(s.unsynced) == 0 { + if s.unsynced.size() == 0 { return } // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs - prefixes, minRev := s.scanUnsync() curRev := s.store.currentRev.main + compactionRev := s.store.compactMainRev + minRev := s.unsynced.scanMinRev(curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes) @@ -360,10 +264,10 @@ func (s *watchableStore) syncWatchers() { tx := s.store.b.BatchTx() tx.Lock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - evs := kvsToEvents(revs, vs, s.unsynced, prefixes) + evs := kvsToEvents(&s.unsynced, revs, vs) tx.Unlock() - for w, eb := range newWatcherBatch(s.unsynced, evs) { + for w, eb := range newWatcherBatch(&s.unsynced, evs) { select { // s.store.Rev also uses Lock, so just return directly case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}: @@ -383,56 +287,18 @@ func (s *watchableStore) syncWatchers() { s.unsynced.delete(w) } - slowWatcherGauge.Set(float64(len(s.unsynced))) -} - -func (s *watchableStore) scanUnsync() (prefixes map[string]struct{}, minRev int64) { - curRev := s.store.currentRev.main - compactionRev := s.store.compactMainRev - - prefixes = make(map[string]struct{}) - minRev = int64(math.MaxInt64) - for _, set := range s.unsynced { - for w := range set { - k := string(w.key) - - if w.cur > curRev { - panic("watcher current revision should not exceed current revision") - } - - if w.cur < compactionRev { - select { - case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactionRev}: - s.unsynced.delete(w) - default: - // retry next time - } - continue - } - - if minRev > w.cur { - minRev = w.cur - } - - if w.prefix { - prefixes[k] = struct{}{} - } - } - } - - return prefixes, minRev + slowWatcherGauge.Set(float64(s.unsynced.size())) } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struct{}) (evs []storagepb.Event) { +func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []storagepb.Event) { for i, v := range vals { var kv storagepb.KeyValue if err := kv.Unmarshal(v); err != nil { log.Panicf("storage: cannot unmarshal event: %v", err) } - k := string(kv.Key) - if _, ok := wsk.getSetByKey(k); !ok && !matchPrefix(k, pfxs) { + if !wg.contains(string(kv.Key)) { continue } @@ -450,26 +316,19 @@ func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struc // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { - we := newWatcherBatch(s.synced, evs) - for _, wm := range s.synced { - for w := range wm { - eb, ok := we[w] - if !ok { - continue - } - if eb.revs != 1 { - panic("unexpected multiple revisions in notification") - } - select { - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: - pendingEventsGauge.Add(float64(len(eb.evs))) - default: - // move slow watcher to unsynced - w.cur = rev - s.unsynced.add(w) - delete(wm, w) - slowWatcherGauge.Inc() - } + for w, eb := range newWatcherBatch(&s.synced, evs) { + if eb.revs != 1 { + panic("unexpected multiple revisions in notification") + } + select { + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: + pendingEventsGauge.Add(float64(len(eb.evs))) + default: + // move slow watcher to unsynced + w.cur = rev + s.unsynced.add(w) + s.synced.delete(w) + slowWatcherGauge.Inc() } } } @@ -479,9 +338,9 @@ func (s *watchableStore) rev() int64 { return s.store.Rev() } type watcher struct { // the watcher key key []byte - // prefix indicates if watcher is on a key or a prefix. - // If prefix is true, the watcher is on a prefix. - prefix bool + // end indicates the end of the range to watch. + // If end is set, the watcher is on a range. + end []byte // cur is the current watcher revision. // If cur is behind the current revision of the KV, // watcher is unsynced and needs to catch up. @@ -492,42 +351,3 @@ type watcher struct { // The chan might be shared with other watchers. ch chan<- WatchResponse } - -// newWatcherBatch maps watchers to their matched events. It enables quick -// events look up by watcher. -func newWatcherBatch(sm watcherSetByKey, evs []storagepb.Event) watcherBatch { - wb := make(watcherBatch) - for _, ev := range evs { - key := string(ev.Kv.Key) - - // check all prefixes of the key to notify all corresponded watchers - for i := 0; i <= len(key); i++ { - for w := range sm[key[:i]] { - // don't double notify - if ev.Kv.ModRevision < w.cur { - continue - } - - // the watcher needs to be notified when either it watches prefix or - // the key is exactly matched. - if !w.prefix && i != len(ev.Kv.Key) { - continue - } - wb.add(w, ev) - } - } - } - - return wb -} - -// matchPrefix returns true if key has any matching prefix -// from prefixes map. -func matchPrefix(key string, prefixes map[string]struct{}) bool { - for p := range prefixes { - if strings.HasPrefix(key, p) { - return true - } - } - return false -} diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 91302be68..f58324337 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -40,11 +40,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // in unsynced for this benchmark. ws := &watchableStore{ store: s, - unsynced: make(watcherSetByKey), + unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(watcherSetByKey), + synced: newWatcherGroup(), } defer func() { @@ -69,7 +69,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // non-0 value to keep watchers in unsynced - watchIDs[i] = w.Watch(testKey, true, 1) + watchIDs[i] = w.Watch(testKey, nil, 1) } // random-cancel N watchers to make it not biased towards @@ -109,7 +109,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // 0 for startRev to keep watchers in synced - watchIDs[i] = w.Watch(testKey, true, 0) + watchIDs[i] = w.Watch(testKey, nil, 0) } // randomly cancel watchers to make it not biased towards diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 51f040dd2..7aa0a5895 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -40,11 +40,11 @@ func TestWatch(t *testing.T) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - w.Watch(testKey, true, 0) + w.Watch(testKey, nil, 0) - if _, ok := s.synced[string(testKey)]; !ok { + if !s.synced.contains(string(testKey)) { // the key must have had an entry in synced - t.Errorf("existence = %v, want true", ok) + t.Errorf("existence = false, want true") } } @@ -61,15 +61,15 @@ func TestNewWatcherCancel(t *testing.T) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - wt := w.Watch(testKey, true, 0) + wt := w.Watch(testKey, nil, 0) if err := w.Cancel(wt); err != nil { t.Error(err) } - if _, ok := s.synced[string(testKey)]; ok { + if s.synced.contains(string(testKey)) { // the key shoud have been deleted - t.Errorf("existence = %v, want false", ok) + t.Errorf("existence = true, want false") } } @@ -83,11 +83,11 @@ func TestCancelUnsynced(t *testing.T) { // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(watcherSetByKey), + unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(watcherSetByKey), + synced: newWatcherGroup(), } defer func() { @@ -112,7 +112,7 @@ func TestCancelUnsynced(t *testing.T) { watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // use 1 to keep watchers in unsynced - watchIDs[i] = w.Watch(testKey, true, 1) + watchIDs[i] = w.Watch(testKey, nil, 1) } for _, idx := range watchIDs { @@ -125,8 +125,8 @@ func TestCancelUnsynced(t *testing.T) { // // unsynced should be empty // because cancel removes watcher from unsynced - if len(s.unsynced) != 0 { - t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) + if size := s.unsynced.size(); size != 0 { + t.Errorf("unsynced size = %d, want 0", size) } } @@ -138,8 +138,8 @@ func TestSyncWatchers(t *testing.T) { s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(watcherSetByKey), - synced: make(watcherSetByKey), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), } defer func() { @@ -158,13 +158,13 @@ func TestSyncWatchers(t *testing.T) { for i := 0; i < watcherN; i++ { // specify rev as 1 to keep watchers in unsynced - w.Watch(testKey, true, 1) + w.Watch(testKey, nil, 1) } // Before running s.syncWatchers() synced should be empty because we manually // populate unsynced only - sws, _ := s.synced.getSetByKey(string(testKey)) - uws, _ := s.unsynced.getSetByKey(string(testKey)) + sws := s.synced.watcherSetByKey(string(testKey)) + uws := s.unsynced.watcherSetByKey(string(testKey)) if len(sws) != 0 { t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws)) @@ -177,8 +177,8 @@ func TestSyncWatchers(t *testing.T) { // this should move all unsynced watchers to synced ones s.syncWatchers() - sws, _ = s.synced.getSetByKey(string(testKey)) - uws, _ = s.unsynced.getSetByKey(string(testKey)) + sws = s.synced.watcherSetByKey(string(testKey)) + uws = s.unsynced.watcherSetByKey(string(testKey)) // After running s.syncWatchers(), synced should not be empty because syncwatchers // populates synced in this test case @@ -240,7 +240,7 @@ func TestWatchCompacted(t *testing.T) { } w := s.NewWatchStream() - wt := w.Watch(testKey, true, compactRev-1) + wt := w.Watch(testKey, nil, compactRev-1) select { case resp := <-w.Chan(): @@ -275,7 +275,7 @@ func TestWatchBatchUnsynced(t *testing.T) { } w := s.NewWatchStream() - w.Watch(v, false, 1) + w.Watch(v, nil, 1) for i := 0; i < batches; i++ { if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs) @@ -284,8 +284,8 @@ func TestWatchBatchUnsynced(t *testing.T) { s.store.mu.Lock() defer s.store.mu.Unlock() - if len(s.synced) != 1 { - t.Errorf("synced size = %d, want 1", len(s.synced)) + if size := s.synced.size(); size != 1 { + t.Errorf("synced size = %d, want 1", size) } } @@ -311,14 +311,14 @@ func TestNewMapwatcherToEventMap(t *testing.T) { } tests := []struct { - sync watcherSetByKey + sync []*watcher evs []storagepb.Event wwe map[*watcher][]storagepb.Event }{ // no watcher in sync, some events should return empty wwe { - watcherSetByKey{}, + nil, evs, map[*watcher][]storagepb.Event{}, }, @@ -326,9 +326,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // one watcher in sync, one event that does not match the key of that // watcher should return empty wwe { - watcherSetByKey{ - string(k2): {ws[2]: struct{}{}}, - }, + []*watcher{ws[2]}, evs[:1], map[*watcher][]storagepb.Event{}, }, @@ -336,9 +334,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // one watcher in sync, one event that matches the key of that // watcher should return wwe with that matching watcher { - watcherSetByKey{ - string(k1): {ws[1]: struct{}{}}, - }, + []*watcher{ws[1]}, evs[1:2], map[*watcher][]storagepb.Event{ ws[1]: evs[1:2], @@ -349,10 +345,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // that matches the key of only one of the watcher should return wwe // with the matching watcher { - watcherSetByKey{ - string(k0): {ws[0]: struct{}{}}, - string(k2): {ws[2]: struct{}{}}, - }, + []*watcher{ws[0], ws[2]}, evs[2:], map[*watcher][]storagepb.Event{ ws[2]: evs[2:], @@ -362,10 +355,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // two watchers in sync that watches the same key, two events that // match the keys should return wwe with those two watchers { - watcherSetByKey{ - string(k0): {ws[0]: struct{}{}}, - string(k1): {ws[1]: struct{}{}}, - }, + []*watcher{ws[0], ws[1]}, evs[:2], map[*watcher][]storagepb.Event{ ws[0]: evs[:1], @@ -375,7 +365,12 @@ func TestNewMapwatcherToEventMap(t *testing.T) { } for i, tt := range tests { - gwe := newWatcherBatch(tt.sync, tt.evs) + wg := newWatcherGroup() + for _, w := range tt.sync { + wg.add(w) + } + + gwe := newWatcherBatch(&wg, tt.evs) if len(gwe) != len(tt.wwe) { t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe)) } diff --git a/storage/watcher.go b/storage/watcher.go index b1507fce3..f609db6e9 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -29,16 +29,15 @@ type WatchID int64 type WatchStream interface { // Watch creates a watcher. The watcher watches the events happening or - // happened on the given key or key prefix from the given startRev. + // happened on the given key or range [key, end) from the given startRev. // // The whole event history can be watched unless compacted. - // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. // If `startRev` <=0, watch observes events after currentRev. // // The returned `id` is the ID of this watcher. It appears as WatchID // in events that are sent to the created watcher through stream channel. // - Watch(key []byte, prefix bool, startRev int64) WatchID + Watch(key, end []byte, startRev int64) WatchID // Chan returns a chan. All watch response will be sent to the returned chan. Chan() <-chan WatchResponse @@ -87,7 +86,7 @@ type watchStream struct { // Watch creates a new watcher in the stream and returns its WatchID. // TODO: return error if ws is closed? -func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) WatchID { +func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID { ws.mu.Lock() defer ws.mu.Unlock() if ws.closed { @@ -97,7 +96,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) WatchID { id := ws.nextID ws.nextID++ - _, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch) + _, c := ws.watchable.watch(key, end, startRev, id, ws.ch) ws.cancels[id] = c return id diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go index ef4bd1f89..0d14b858e 100644 --- a/storage/watcher_bench_test.go +++ b/storage/watcher_bench_test.go @@ -33,6 +33,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) { b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - w.Watch([]byte(fmt.Sprint("foo", i)), false, 0) + w.Watch([]byte(fmt.Sprint("foo", i)), nil, 0) } } diff --git a/storage/watcher_group.go b/storage/watcher_group.go new file mode 100644 index 000000000..4cbf54913 --- /dev/null +++ b/storage/watcher_group.go @@ -0,0 +1,269 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "math" + + "github.com/coreos/etcd/pkg/adt" + "github.com/coreos/etcd/storage/storagepb" +) + +var ( + // watchBatchMaxRevs is the maximum distinct revisions that + // may be sent to an unsynced watcher at a time. Declared as + // var instead of const for testing purposes. + watchBatchMaxRevs = 1000 +) + +type eventBatch struct { + // evs is a batch of revision-ordered events + evs []storagepb.Event + // revs is the minimum unique revisions observed for this batch + revs int + // moreRev is first revision with more events following this batch + moreRev int64 +} + +func (eb *eventBatch) add(ev storagepb.Event) { + if eb.revs > watchBatchMaxRevs { + // maxed out batch size + return + } + + if len(eb.evs) == 0 { + // base case + eb.revs = 1 + eb.evs = append(eb.evs, ev) + return + } + + // revision accounting + ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision + evRev := ev.Kv.ModRevision + if evRev > ebRev { + eb.revs++ + if eb.revs > watchBatchMaxRevs { + eb.moreRev = evRev + return + } + } + + eb.evs = append(eb.evs, ev) +} + +type watcherBatch map[*watcher]*eventBatch + +func (wb watcherBatch) add(w *watcher, ev storagepb.Event) { + eb := wb[w] + if eb == nil { + eb = &eventBatch{} + wb[w] = eb + } + eb.add(ev) +} + +// newWatcherBatch maps watchers to their matched events. It enables quick +// events look up by watcher. +func newWatcherBatch(wg *watcherGroup, evs []storagepb.Event) watcherBatch { + wb := make(watcherBatch) + for _, ev := range evs { + for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { + if ev.Kv.ModRevision >= w.cur { + // don't double notify + wb.add(w, ev) + } + } + } + return wb +} + +type watcherSet map[*watcher]struct{} + +func (w watcherSet) add(wa *watcher) { + if _, ok := w[wa]; ok { + panic("add watcher twice!") + } + w[wa] = struct{}{} +} + +func (w watcherSet) union(ws watcherSet) { + for wa := range ws { + w.add(wa) + } +} + +func (w watcherSet) delete(wa *watcher) { + if _, ok := w[wa]; !ok { + panic("removing missing watcher!") + } + delete(w, wa) +} + +type watcherSetByKey map[string]watcherSet + +func (w watcherSetByKey) add(wa *watcher) { + set := w[string(wa.key)] + if set == nil { + set = make(watcherSet) + w[string(wa.key)] = set + } + set.add(wa) +} + +func (w watcherSetByKey) delete(wa *watcher) bool { + k := string(wa.key) + if v, ok := w[k]; ok { + if _, ok := v[wa]; ok { + delete(v, wa) + if len(v) == 0 { + // remove the set; nothing left + delete(w, k) + } + return true + } + } + return false +} + +type interval struct { + begin string + end string +} + +type watcherSetByInterval map[interval]watcherSet + +// watcherGroup is a collection of watchers organized by their ranges +type watcherGroup struct { + // keyWatchers has the watchers that watch on a single key + keyWatchers watcherSetByKey + // ranges has the watchers that watch a range; it is sorted by interval + ranges adt.IntervalTree + // watchers is the set of all watchers + watchers watcherSet +} + +func newWatcherGroup() watcherGroup { + return watcherGroup{ + keyWatchers: make(watcherSetByKey), + watchers: make(watcherSet), + } +} + +// add puts a watcher in the group. +func (wg *watcherGroup) add(wa *watcher) { + wg.watchers.add(wa) + if wa.end == nil { + wg.keyWatchers.add(wa) + return + } + + // interval already registered? + ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) + if iv := wg.ranges.Find(ivl); iv != nil { + iv.Val.(watcherSet).add(wa) + return + } + + // not registered, put in interval tree + ws := make(watcherSet) + ws.add(wa) + wg.ranges.Insert(ivl, ws) +} + +// contains is whether the given key has a watcher in the group. +func (wg *watcherGroup) contains(key string) bool { + _, ok := wg.keyWatchers[key] + return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key)) +} + +// size gives the number of unique watchers in the group. +func (wg *watcherGroup) size() int { return len(wg.watchers) } + +// delete removes a watcher from the group. +func (wg *watcherGroup) delete(wa *watcher) bool { + if _, ok := wg.watchers[wa]; !ok { + return false + } + wg.watchers.delete(wa) + if wa.end == nil { + wg.keyWatchers.delete(wa) + return true + } + + ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) + iv := wg.ranges.Find(ivl) + if iv == nil { + return false + } + + ws := iv.Val.(watcherSet) + delete(ws, wa) + if len(ws) == 0 { + // remove interval missing watchers + if ok := wg.ranges.Delete(ivl); !ok { + panic("could not remove watcher from interval tree") + } + } + + return true +} + +func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 { + minRev := int64(math.MaxInt64) + for w := range wg.watchers { + if w.cur > curRev { + panic("watcher current revision should not exceed current revision") + } + if w.cur < compactRev { + select { + case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: + wg.delete(w) + default: + // retry next time + } + continue + } + if minRev > w.cur { + minRev = w.cur + } + } + return minRev +} + +// watcherSetByKey gets the set of watchers that recieve events on the given key. +func (wg *watcherGroup) watcherSetByKey(key string) watcherSet { + wkeys := wg.keyWatchers[key] + wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key)) + + // zero-copy cases + switch { + case len(wranges) == 0: + // no need to merge ranges or copy; reuse single-key set + return wkeys + case len(wranges) == 0 && len(wkeys) == 0: + return nil + case len(wranges) == 1 && len(wkeys) == 0: + return wranges[0].Val.(watcherSet) + } + + // copy case + ret := make(watcherSet) + ret.union(wg.keyWatchers[key]) + for _, item := range wranges { + ret.union(item.Val.(watcherSet)) + } + return ret +} diff --git a/storage/watcher_test.go b/storage/watcher_test.go index 41dcdd96e..1f75eba00 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -35,7 +35,7 @@ func TestWatcherWatchID(t *testing.T) { idm := make(map[WatchID]struct{}) for i := 0; i < 10; i++ { - id := w.Watch([]byte("foo"), false, 0) + id := w.Watch([]byte("foo"), nil, 0) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } @@ -57,7 +57,7 @@ func TestWatcherWatchID(t *testing.T) { // unsynced watchers for i := 10; i < 20; i++ { - id := w.Watch([]byte("foo2"), false, 1) + id := w.Watch([]byte("foo2"), nil, 1) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } @@ -86,12 +86,11 @@ func TestWatcherWatchPrefix(t *testing.T) { idm := make(map[WatchID]struct{}) - prefixMatch := true val := []byte("bar") - keyWatch, keyPut := []byte("foo"), []byte("foobar") + keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar") for i := 0; i < 10; i++ { - id := w.Watch(keyWatch, prefixMatch, 0) + id := w.Watch(keyWatch, keyEnd, 0) if _, ok := idm[id]; ok { t.Errorf("#%d: unexpected duplicated id %x", i, id) } @@ -118,12 +117,12 @@ func TestWatcherWatchPrefix(t *testing.T) { } } - keyWatch1, keyPut1 := []byte("foo1"), []byte("foo1bar") + keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar") s.Put(keyPut1, val, lease.NoLease) // unsynced watchers for i := 10; i < 15; i++ { - id := w.Watch(keyWatch1, prefixMatch, 1) + id := w.Watch(keyWatch1, keyEnd1, 1) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } @@ -159,7 +158,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { w := s.NewWatchStream() defer w.Close() - id := w.Watch([]byte("foo"), false, 0) + id := w.Watch([]byte("foo"), nil, 0) tests := []struct { cancelID WatchID