diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 1485b8468..cf73e17d7 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -33,7 +33,7 @@ const ( ) type watchable interface { - watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) + watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) } type watchableStore struct { @@ -75,10 +75,11 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) { if err != nil { log.Panicf("unexpected range error (%v)", err) } - s.handle(rev, storagepb.Event{ + ev := storagepb.Event{ Type: storagepb.PUT, Kv: &kvs[0], - }) + } + s.handle(rev, []storagepb.Event{ev}) return rev } @@ -92,14 +93,15 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { log.Panicf("unexpected range error (%v)", err) } n, rev = s.store.DeleteRange(key, end) - for _, kv := range kvs { - s.handle(rev, storagepb.Event{ + evs := make([]storagepb.Event, len(kvs)) + for i, kv := range kvs { + evs[i] = storagepb.Event{ Type: storagepb.DELETE, Kv: &storagepb.KeyValue{ Key: kv.Key, - }, - }) + }} } + s.handle(rev, evs) return n, rev } @@ -138,24 +140,33 @@ func (s *watchableStore) TxnEnd(txnID int64) error { } _, rev, _ := s.store.Range(nil, nil, 0, 0) + + evs := []storagepb.Event{} + for k := range s.tx.putm { kvs, _, err := s.store.Range([]byte(k), nil, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } - s.handle(rev, storagepb.Event{ + ev := storagepb.Event{ Type: storagepb.PUT, Kv: &kvs[0], - }) + } + evs = append(evs, ev) } + for k := range s.tx.delm { - s.handle(rev, storagepb.Event{ + ev := storagepb.Event{ Type: storagepb.DELETE, Kv: &storagepb.KeyValue{ Key: []byte(k), }, - }) + } + evs = append(evs, ev) } + + s.handle(rev, evs) + s.mu.Unlock() return nil } @@ -170,11 +181,11 @@ func (s *watchableStore) NewWatcher() Watcher { watcherGauge.Inc() return &watcher{ watchable: s, - ch: make(chan storagepb.Event, chanBufLen), + ch: make(chan []storagepb.Event, chanBufLen), } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) { +func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() @@ -301,6 +312,9 @@ func (s *watchableStore) syncWatchings() { ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) tx.Unlock() + evs := []storagepb.Event{} + + // get the list of all events from all key-value pairs for i, v := range vs { var kv storagepb.KeyValue if err := kv.Unmarshal(v); err != nil { @@ -308,8 +322,7 @@ func (s *watchableStore) syncWatchings() { } k := string(kv.Key) - wm, ok := keyToUnsynced[k] - if !ok { + if _, ok := keyToUnsynced[k]; !ok { continue } @@ -322,56 +335,53 @@ func (s *watchableStore) syncWatchings() { } ev.Kv = &kv - for w := range wm { - ev.WatchID = w.id + evs = append(evs, ev) + } - select { - case w.ch <- ev: - pendingEventsGauge.Inc() - default: - // TODO: handle the full unsynced watchings. - // continue to process other watchings for now, the full ones - // will be processed next time and hopefully it will not be full. - continue - } - if err := unsafeAddWatching(&s.synced, k, w); err != nil { - log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) - } - delete(s.unsynced, w) + for w, es := range newWatchingToEventMap(keyToUnsynced, evs) { + select { + case w.ch <- es: + pendingEventsGauge.Add(float64(len(es))) + default: + // TODO: handle the full unsynced watchings. + // continue to process other watchings for now, the full ones + // will be processed next time and hopefully it will not be full. + continue } + k := string(w.key) + if err := unsafeAddWatching(&s.synced, k, w); err != nil { + log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) + } + delete(s.unsynced, w) } slowWatchingGauge.Set(float64(len(s.unsynced))) } // handle handles the change of the happening event on all watchings. -func (s *watchableStore) handle(rev int64, ev storagepb.Event) { - s.notify(rev, ev) +func (s *watchableStore) handle(rev int64, evs []storagepb.Event) { + s.notify(rev, evs) } // notify notifies the fact that given event at the given rev just happened to // watchings that watch on the key of the event. -func (s *watchableStore) notify(rev int64, ev storagepb.Event) { - // check all prefixes of the key to notify all corresponded watchings - for i := 0; i <= len(ev.Kv.Key); i++ { - k := string(ev.Kv.Key[:i]) - if wm, ok := s.synced[k]; ok { - for w := range wm { - // the watching needs to be notified when either it watches prefix or - // the key is exactly matched. - if !w.prefix && i != len(ev.Kv.Key) { - continue - } - ev.WatchID = w.id - select { - case w.ch <- ev: - pendingEventsGauge.Inc() - default: - w.cur = rev - s.unsynced[w] = struct{}{} - delete(wm, w) - slowWatchingGauge.Inc() - } +func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { + we := newWatchingToEventMap(s.synced, evs) + for _, wm := range s.synced { + for w := range wm { + if _, ok := we[w]; !ok { + continue + } + es := we[w] + select { + case w.ch <- es: + pendingEventsGauge.Add(float64(len(es))) + default: + // move slow watching to unsynced + w.cur = rev + s.unsynced[w] = struct{}{} + delete(wm, w) + slowWatchingGauge.Inc() } } } @@ -418,7 +428,7 @@ type watching struct { // a chan to send out the watched events. // The chan might be shared with other watchings. - ch chan<- storagepb.Event + ch chan<- []storagepb.Event } // unsafeAddWatching puts watching with key k into watchableStore's synced. @@ -441,3 +451,38 @@ func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa * mp[k][wa] = struct{}{} return nil } + +// newWatchingToEventMap creates a map that has watching as key and events as +// value. It enables quick events look up by watching. +func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event { + watchingToEvents := make(map[*watching][]storagepb.Event) + for _, ev := range evs { + key := string(ev.Kv.Key) + + // check all prefixes of the key to notify all corresponded watchings + for i := 0; i <= len(key); i++ { + k := string(key[:i]) + + wm, ok := sm[k] + if !ok { + continue + } + + for w := range wm { + // the watching needs to be notified when either it watches prefix or + // the key is exactly matched. + if !w.prefix && i != len(ev.Kv.Key) { + continue + } + ev.WatchID = w.id + + if _, ok := watchingToEvents[w]; !ok { + watchingToEvents[w] = []storagepb.Event{} + } + watchingToEvents[w] = append(watchingToEvents[w], ev) + } + } + } + + return watchingToEvents +}