From 3ddcc211791950594755048595bd1c95e94f7359 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 29 Apr 2016 09:24:46 -0700 Subject: [PATCH] mvcc: fix watch deleteRange --- mvcc/watchable_store.go | 4 ++-- mvcc/watcher_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 2815e7c77..1ef571512 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -112,10 +112,10 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { } evs := make([]mvccpb.Event, n) - for i, change := range changes { + for i := range changes { evs[i] = mvccpb.Event{ Type: mvccpb.DELETE, - Kv: &change} + Kv: &changes[i]} evs[i].Kv.ModRevision = rev } s.notify(rev, evs) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index ad5bac7c6..e673f0ec6 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -16,6 +16,7 @@ package mvcc import ( "bytes" + "fmt" "os" "reflect" "testing" @@ -23,6 +24,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/mvcc/mvccpb" ) // TestWatcherWatchID tests that each watcher provides unique watchID, @@ -151,6 +153,43 @@ func TestWatcherWatchPrefix(t *testing.T) { } } +func TestWatchDeleteRange(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + testKeyPrefix := []byte("foo") + + for i := 0; i < 3; i++ { + s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease) + } + + w := s.NewWatchStream() + from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99)) + w.Watch(from, to, 0) + + s.DeleteRange(from, to) + + we := []mvccpb.Event{ + {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}}, + {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}}, + {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}}, + } + + select { + case r := <-w.Chan(): + if !reflect.DeepEqual(r.Events, we) { + t.Errorf("event = %v, want %v", r.Events, we) + } + case <-time.After(10 * time.Second): + t.Fatal("failed to receive event after 10 seconds!") + } +} + // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) {