Merge pull request #4202 from gyuho/prefix_watch

storage: check prefix in unsynced
This commit is contained in:
Gyu-Ho Lee
2016-01-13 11:32:43 -08:00
2 changed files with 94 additions and 1 deletions

View File

@@ -18,6 +18,7 @@ import (
"fmt"
"log"
"math"
"strings"
"sync"
"time"
@@ -284,6 +285,7 @@ func (s *watchableStore) syncWatchers() {
// TODO: change unsynced struct type same to this
keyToUnsynced := make(map[string]map[*watcher]struct{})
prefixes := make(map[string]struct{})
for w := range s.unsynced {
k := string(w.key)
@@ -307,6 +309,10 @@ func (s *watchableStore) syncWatchers() {
keyToUnsynced[k] = make(map[*watcher]struct{})
}
keyToUnsynced[k][w] = struct{}{}
if w.prefix {
prefixes[k] = struct{}{}
}
}
minBytes, maxBytes := newRevBytes(), newRevBytes()
@@ -330,7 +336,7 @@ func (s *watchableStore) syncWatchers() {
}
k := string(kv.Key)
if _, ok := keyToUnsynced[k]; !ok {
if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixes) {
continue
}
@@ -496,3 +502,14 @@ func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.E
return watcherToEvents
}
// matchPrefix returns true if key has any matching prefix
// from prefixes map.
func matchPrefix(key string, prefixes map[string]struct{}) bool {
for p := range prefixes {
if strings.HasPrefix(key, p) {
return true
}
}
return false
}

View File

@@ -15,6 +15,7 @@
package storage
import (
"bytes"
"testing"
"github.com/coreos/etcd/lease"
@@ -73,6 +74,81 @@ func TestWatcherWatchID(t *testing.T) {
}
}
// TestWatcherWatchPrefix tests if Watch operation correctly watches
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
idm := make(map[WatchID]struct{})
prefixMatch := true
val := []byte("bar")
keyWatch, keyPut := []byte("foo"), []byte("foobar")
for i := 0; i < 10; i++ {
id := w.Watch(keyWatch, prefixMatch, 0)
if _, ok := idm[id]; ok {
t.Errorf("#%d: unexpected duplicated id %x", i, id)
}
idm[id] = struct{}{}
s.Put(keyPut, val, lease.NoLease)
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Errorf("#%d: unexpected cancel error %v", i, err)
}
if len(resp.Events) != 1 {
t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
}
if len(resp.Events) == 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
}
}
}
keyWatch1, keyPut1 := []byte("foo1"), []byte("foo1bar")
s.Put(keyPut1, val, lease.NoLease)
// unsynced watchers
for i := 10; i < 15; i++ {
id := w.Watch(keyWatch1, prefixMatch, 1)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Error(err)
}
if len(resp.Events) != 1 {
t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
}
if len(resp.Events) == 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
}
}
}
}
// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {