storage: add watchSet and watchSetByKey type

This commit is contained in:
Xiang Li 2016-02-02 18:56:36 -08:00
parent 83411b92b4
commit e5b35b82c5
3 changed files with 45 additions and 38 deletions

View File

@ -35,6 +35,27 @@ const (
chanBufLen = 1024 chanBufLen = 1024
) )
type (
watcherSetByKey map[string]watcherSet
watcherSet map[*watcher]struct{}
)
func (w watcherSet) add(wa *watcher) {
if _, ok := w[wa]; ok {
panic("add watcher twice!")
}
w[wa] = struct{}{}
}
func (w watcherSetByKey) add(wa *watcher) {
set := w[string(wa.key)]
if set == nil {
set = make(watcherSet)
w[string(wa.key)] = set
}
set.add(wa)
}
type watchable interface { type watchable interface {
watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
rev() int64 rev() int64
@ -46,11 +67,11 @@ type watchableStore struct {
*store *store
// contains all unsynced watchers that needs to sync with events that have happened // contains all unsynced watchers that needs to sync with events that have happened
unsynced map[*watcher]struct{} unsynced watcherSet
// contains all synced watchers that are in sync with the progress of the store. // contains all synced watchers that are in sync with the progress of the store.
// The key of the map is the key that the watcher watches on. // The key of the map is the key that the watcher watches on.
synced map[string]map[*watcher]struct{} synced watcherSetByKey
stopc chan struct{} stopc chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -63,8 +84,8 @@ type cancelFunc func()
func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
s := &watchableStore{ s := &watchableStore{
store: NewStore(b, le), store: NewStore(b, le),
unsynced: make(map[*watcher]struct{}), unsynced: make(watcherSet),
synced: make(map[string]map[*watcher]struct{}), synced: make(watcherSetByKey),
stopc: make(chan struct{}), stopc: make(chan struct{}),
} }
if s.le != nil { if s.le != nil {
@ -185,7 +206,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
k := string(key) k := string(key)
if startRev == 0 { if startRev == 0 {
if err := unsafeAddWatcher(&s.synced, k, wa); err != nil { if err := unsafeAddWatcher(s.synced, k, wa); err != nil {
log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
} }
} else { } else {
@ -261,7 +282,7 @@ func (s *watchableStore) syncWatchers() {
compactionRev := s.store.compactMainRev compactionRev := s.store.compactMainRev
// TODO: change unsynced struct type same to this // TODO: change unsynced struct type same to this
keyToUnsynced := make(map[string]map[*watcher]struct{}) keyToUnsynced := make(watcherSetByKey)
prefixes := make(map[string]struct{}) prefixes := make(map[string]struct{})
for w := range s.unsynced { for w := range s.unsynced {
@ -282,10 +303,7 @@ func (s *watchableStore) syncWatchers() {
minRev = w.cur minRev = w.cur
} }
if _, ok := keyToUnsynced[k]; !ok { keyToUnsynced.add(w)
keyToUnsynced[k] = make(map[*watcher]struct{})
}
keyToUnsynced[k][w] = struct{}{}
if w.prefix { if w.prefix {
prefixes[k] = struct{}{} prefixes[k] = struct{}{}
@ -341,7 +359,7 @@ func (s *watchableStore) syncWatchers() {
continue continue
} }
k := string(w.key) k := string(w.key)
if err := unsafeAddWatcher(&s.synced, k, w); err != nil { if err := unsafeAddWatcher(s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
} }
delete(s.unsynced, w) delete(s.unsynced, w)
@ -400,28 +418,17 @@ type watcher struct {
// unsafeAddWatcher puts watcher with key k into watchableStore's synced. // unsafeAddWatcher puts watcher with key k into watchableStore's synced.
// Make sure to this is thread-safe using mutex before and after. // Make sure to this is thread-safe using mutex before and after.
func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error { func unsafeAddWatcher(synced watcherSetByKey, k string, wa *watcher) error {
if wa == nil { if wa == nil {
return fmt.Errorf("nil watcher received") return fmt.Errorf("nil watcher received")
} }
mp := *synced synced.add(wa)
if v, ok := mp[k]; ok {
if _, ok := v[wa]; ok {
return fmt.Errorf("put the same watcher twice: %+v", wa)
} else {
v[wa] = struct{}{}
}
return nil
}
mp[k] = make(map[*watcher]struct{})
mp[k][wa] = struct{}{}
return nil return nil
} }
// newWatcherToEventMap creates a map that has watcher as key and events as // newWatcherToEventMap creates a map that has watcher as key and events as
// value. It enables quick events look up by watcher. // value. It enables quick events look up by watcher.
func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event { func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event {
watcherToEvents := make(map[*watcher][]storagepb.Event) watcherToEvents := make(map[*watcher][]storagepb.Event)
for _, ev := range evs { for _, ev := range evs {
key := string(ev.Kv.Key) key := string(ev.Kv.Key)

View File

@ -40,11 +40,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
// in unsynced for this benchmark. // in unsynced for this benchmark.
ws := &watchableStore{ ws := &watchableStore{
store: s, store: s,
unsynced: make(map[*watcher]struct{}), unsynced: make(watcherSet),
// to make the test not crash from assigning to nil map. // to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test. // 'synced' doesn't get populated in this test.
synced: make(map[string]map[*watcher]struct{}), synced: make(watcherSetByKey),
} }
defer func() { defer func() {

View File

@ -82,11 +82,11 @@ func TestCancelUnsynced(t *testing.T) {
// in unsynced to test if syncWatchers works as expected. // in unsynced to test if syncWatchers works as expected.
s := &watchableStore{ s := &watchableStore{
store: NewStore(b, &lease.FakeLessor{}), store: NewStore(b, &lease.FakeLessor{}),
unsynced: make(map[*watcher]struct{}), unsynced: make(watcherSet),
// to make the test not crash from assigning to nil map. // to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test. // 'synced' doesn't get populated in this test.
synced: make(map[string]map[*watcher]struct{}), synced: make(watcherSetByKey),
} }
defer func() { defer func() {
@ -137,8 +137,8 @@ func TestSyncWatchers(t *testing.T) {
s := &watchableStore{ s := &watchableStore{
store: NewStore(b, &lease.FakeLessor{}), store: NewStore(b, &lease.FakeLessor{}),
unsynced: make(map[*watcher]struct{}), unsynced: make(watcherSet),
synced: make(map[string]map[*watcher]struct{}), synced: make(watcherSetByKey),
} }
defer func() { defer func() {
@ -238,7 +238,7 @@ func TestUnsafeAddWatcher(t *testing.T) {
// to test if unsafeAddWatcher is correctly updating // to test if unsafeAddWatcher is correctly updating
// synced map when adding new watcher. // synced map when adding new watcher.
for i, wa := range ws { for i, wa := range ws {
if err := unsafeAddWatcher(&s.synced, string(testKey), wa); err != nil { if err := unsafeAddWatcher(s.synced, string(testKey), wa); err != nil {
t.Errorf("#%d: error = %v, want nil", i, err) t.Errorf("#%d: error = %v, want nil", i, err)
} }
if v, ok := s.synced[string(testKey)]; !ok { if v, ok := s.synced[string(testKey)]; !ok {
@ -276,14 +276,14 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
} }
tests := []struct { tests := []struct {
sync map[string]map[*watcher]struct{} sync watcherSetByKey
evs []storagepb.Event evs []storagepb.Event
wwe map[*watcher][]storagepb.Event wwe map[*watcher][]storagepb.Event
}{ }{
// no watcher in sync, some events should return empty wwe // no watcher in sync, some events should return empty wwe
{ {
map[string]map[*watcher]struct{}{}, watcherSetByKey{},
evs, evs,
map[*watcher][]storagepb.Event{}, map[*watcher][]storagepb.Event{},
}, },
@ -291,7 +291,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
// one watcher in sync, one event that does not match the key of that // one watcher in sync, one event that does not match the key of that
// watcher should return empty wwe // watcher should return empty wwe
{ {
map[string]map[*watcher]struct{}{ watcherSetByKey{
string(k2): {ws[2]: struct{}{}}, string(k2): {ws[2]: struct{}{}},
}, },
evs[:1], evs[:1],
@ -301,7 +301,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
// one watcher in sync, one event that matches the key of that // one watcher in sync, one event that matches the key of that
// watcher should return wwe with that matching watcher // watcher should return wwe with that matching watcher
{ {
map[string]map[*watcher]struct{}{ watcherSetByKey{
string(k1): {ws[1]: struct{}{}}, string(k1): {ws[1]: struct{}{}},
}, },
evs[1:2], evs[1:2],
@ -314,7 +314,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
// that matches the key of only one of the watcher should return wwe // that matches the key of only one of the watcher should return wwe
// with the matching watcher // with the matching watcher
{ {
map[string]map[*watcher]struct{}{ watcherSetByKey{
string(k0): {ws[0]: struct{}{}}, string(k0): {ws[0]: struct{}{}},
string(k2): {ws[2]: struct{}{}}, string(k2): {ws[2]: struct{}{}},
}, },
@ -327,7 +327,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
// two watchers in sync that watches the same key, two events that // two watchers in sync that watches the same key, two events that
// match the keys should return wwe with those two watchers // match the keys should return wwe with those two watchers
{ {
map[string]map[*watcher]struct{}{ watcherSetByKey{
string(k0): {ws[0]: struct{}{}}, string(k0): {ws[0]: struct{}{}},
string(k1): {ws[1]: struct{}{}}, string(k1): {ws[1]: struct{}{}},
}, },