etcd/mvcc/watcher_test.go
Xiang Li dced92f8bd *: support watch with filters
Now user can filter events with types. The API is also extensible.
It might make sense for the proxy to filter out events based on
more expensive/customized filter.
2016-06-28 13:46:57 -07:00

323 lines
7.9 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// TestWatcherWatchID tests that each watcher provides unique watchID,
// and the watched event attaches the correct watchID.
func TestWatcherWatchID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
idm := make(map[WatchID]struct{})
for i := 0; i < 10; i++ {
id := w.Watch([]byte("foo"), nil, 0)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Error(err)
}
}
s.Put([]byte("foo2"), []byte("bar"), lease.NoLease)
// unsynced watchers
for i := 10; i < 20; i++ {
id := w.Watch([]byte("foo2"), nil, 1)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Error(err)
}
}
}
// TestWatcherWatchPrefix tests if Watch operation correctly watches
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
idm := make(map[WatchID]struct{})
val := []byte("bar")
keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar")
for i := 0; i < 10; i++ {
id := w.Watch(keyWatch, keyEnd, 0)
if _, ok := idm[id]; ok {
t.Errorf("#%d: unexpected duplicated id %x", i, id)
}
idm[id] = struct{}{}
s.Put(keyPut, val, lease.NoLease)
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Errorf("#%d: unexpected cancel error %v", i, err)
}
if len(resp.Events) != 1 {
t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
}
if len(resp.Events) == 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
}
}
}
keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar")
s.Put(keyPut1, val, lease.NoLease)
// unsynced watchers
for i := 10; i < 15; i++ {
id := w.Watch(keyWatch1, keyEnd1, 1)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}
resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}
if err := w.Cancel(id); err != nil {
t.Error(err)
}
if len(resp.Events) != 1 {
t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
}
if len(resp.Events) == 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
}
}
}
}
func TestWatchDeleteRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKeyPrefix := []byte("foo")
for i := 0; i < 3; i++ {
s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
}
w := s.NewWatchStream()
from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
w.Watch(from, to, 0)
s.DeleteRange(from, to)
we := []mvccpb.Event{
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
}
select {
case r := <-w.Chan():
if !reflect.DeepEqual(r.Events, we) {
t.Errorf("event = %v, want %v", r.Events, we)
}
case <-time.After(10 * time.Second):
t.Fatal("failed to receive event after 10 seconds!")
}
}
// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
id := w.Watch([]byte("foo"), nil, 0)
tests := []struct {
cancelID WatchID
werr error
}{
// no error should be returned when cancel the created watcher.
{id, nil},
// not exist error should be returned when cancel again.
{id, ErrWatcherNotExist},
// not exist error should be returned when cancel a bad id.
{id + 1, ErrWatcherNotExist},
}
for i, tt := range tests {
gerr := w.Cancel(tt.cancelID)
if gerr != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
}
}
if l := len(w.(*watchStream).cancels); l != 0 {
t.Errorf("cancels = %d, want 0", l)
}
}
// TestWatcherRequestProgress ensures synced watcher can correctly
// report its correct progress.
func TestWatcherRequestProgress(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
// manually create watchableStore instead of newWatchableStore
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced to test if syncWatchers works as expected.
s := &watchableStore{
store: NewStore(b, &lease.FakeLessor{}, nil),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
}
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
notTestKey := []byte("bad")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
badID := WatchID(1000)
w.RequestProgress(badID)
select {
case resp := <-w.Chan():
t.Fatalf("unexpected %+v", resp)
default:
}
id := w.Watch(notTestKey, nil, 1)
w.RequestProgress(id)
select {
case resp := <-w.Chan():
t.Fatalf("unexpected %+v", resp)
default:
}
s.syncWatchers()
w.RequestProgress(id)
wrs := WatchResponse{WatchID: 0, Revision: 2}
select {
case resp := <-w.Chan():
if !reflect.DeepEqual(resp, wrs) {
t.Fatalf("got %+v, expect %+v", resp, wrs)
}
case <-time.After(time.Second):
t.Fatal("failed to receive progress")
}
}
func TestWatcherWatchWithFilter(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
filterPut := func(e mvccpb.Event) bool {
return e.Type == mvccpb.PUT
}
w.Watch([]byte("foo"), nil, 0, filterPut)
done := make(chan struct{})
go func() {
<-w.Chan()
done <- struct{}{}
}()
s.Put([]byte("foo"), []byte("bar"), 0)
select {
case <-done:
t.Fatal("failed to filter put request")
case <-time.After(100 * time.Millisecond):
}
s.DeleteRange([]byte("foo"), nil)
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Fatal("failed to receive delete request")
}
}