From 4f427bca430618365d34d06e9b4dbd50019196a1 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 13 Jan 2016 11:09:07 -0800 Subject: [PATCH] storage: check prefix in unsynced Current syncWatchers method skips the events that have prefixes that are being watched when the prefix is not existent as a key. This fixes https://github.com/coreos/etcd/issues/4191 by adding prefix checking to not skip those events. --- storage/watchable_store.go | 19 +++++++++- storage/watcher_test.go | 76 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ae5f0f71e..435d282b7 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -18,6 +18,7 @@ import ( "fmt" "log" "math" + "strings" "sync" "time" @@ -284,6 +285,7 @@ func (s *watchableStore) syncWatchers() { // TODO: change unsynced struct type same to this keyToUnsynced := make(map[string]map[*watcher]struct{}) + prefixes := make(map[string]struct{}) for w := range s.unsynced { k := string(w.key) @@ -307,6 +309,10 @@ func (s *watchableStore) syncWatchers() { keyToUnsynced[k] = make(map[*watcher]struct{}) } keyToUnsynced[k][w] = struct{}{} + + if w.prefix { + prefixes[k] = struct{}{} + } } minBytes, maxBytes := newRevBytes(), newRevBytes() @@ -330,7 +336,7 @@ func (s *watchableStore) syncWatchers() { } k := string(kv.Key) - if _, ok := keyToUnsynced[k]; !ok { + if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixes) { continue } @@ -496,3 +502,14 @@ func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.E return watcherToEvents } + +// matchPrefix returns true if key has any matching prefix +// from prefixes map. +func matchPrefix(key string, prefixes map[string]struct{}) bool { + for p := range prefixes { + if strings.HasPrefix(key, p) { + return true + } + } + return false +} diff --git a/storage/watcher_test.go b/storage/watcher_test.go index e520d9983..d6374e14b 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -15,6 +15,7 @@ package storage import ( + "bytes" "testing" "github.com/coreos/etcd/lease" @@ -73,6 +74,81 @@ func TestWatcherWatchID(t *testing.T) { } } +// TestWatcherWatchPrefix tests if Watch operation correctly watches +// and returns events with matching prefixes. +func TestWatcherWatchPrefix(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + defer cleanup(s, b, tmpPath) + + w := s.NewWatchStream() + defer w.Close() + + idm := make(map[WatchID]struct{}) + + prefixMatch := true + val := []byte("bar") + keyWatch, keyPut := []byte("foo"), []byte("foobar") + + for i := 0; i < 10; i++ { + id := w.Watch(keyWatch, prefixMatch, 0) + if _, ok := idm[id]; ok { + t.Errorf("#%d: unexpected duplicated id %x", i, id) + } + idm[id] = struct{}{} + + s.Put(keyPut, val, lease.NoLease) + + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) + } + + if err := w.Cancel(id); err != nil { + t.Errorf("#%d: unexpected cancel error %v", i, err) + } + + if len(resp.Events) != 1 { + t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) + } + if len(resp.Events) == 1 { + if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) { + t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut) + } + } + } + + keyWatch1, keyPut1 := []byte("foo1"), []byte("foo1bar") + s.Put(keyPut1, val, lease.NoLease) + + // unsynced watchers + for i := 10; i < 15; i++ { + id := w.Watch(keyWatch1, prefixMatch, 1) + if _, ok := idm[id]; ok { + t.Errorf("#%d: id %d exists", i, id) + } + idm[id] = struct{}{} + + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) + } + + if err := w.Cancel(id); err != nil { + t.Error(err) + } + + if len(resp.Events) != 1 { + t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) + } + if len(resp.Events) == 1 { + if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) { + t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1) + } + } + } +} + // TestWatchStreamCancel ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) {