From d84811aecf1538e60dee4bebe2ae428a428f91a5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 5 Mar 2016 14:45:43 -0800 Subject: [PATCH 1/2] *: fix watch full key range --- clientv3/op.go | 7 +++++-- etcdserver/api/v3rpc/watch.go | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/clientv3/op.go b/clientv3/op.go index 135ff6751..0f20db5df 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -28,6 +28,10 @@ const ( tDeleteRange ) +var ( + noPrefixEnd = []byte{0} +) + // Op represents an Operation that kv can execute. type Op struct { t opType @@ -175,8 +179,7 @@ func getPrefix(key []byte) []byte { } // next prefix does not exist (e.g., 0xffff); // default to WithFromKey policy - end = []byte{0} - return end + return noPrefixEnd } // WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 55e5dbfe4..e88e0793b 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -111,6 +111,10 @@ func (sws *serverWatchStream) recvLoop() error { } creq := uv.CreateRequest + if len(creq.Key) == 0 { + // \x00 is the smallest key + creq.Key = []byte{0} + } if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 { // support >= key queries creq.RangeEnd = []byte{} From 633a0bdf5562e4cef05c7573adebbb58c0be81fd Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 5 Mar 2016 18:52:41 -0800 Subject: [PATCH 2/2] integration: add test for full range watching --- integration/v3_watch_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 16ae7dcc9..7a0e123df 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -98,6 +98,27 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []*pb.WatchResponse{}, }, + // watch full range, matching + { + []string{"fooLong"}, + &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte(""), + RangeEnd: []byte("\x00")}}}, + + []*pb.WatchResponse{ + { + Header: &pb.ResponseHeader{Revision: 2}, + Created: false, + Events: []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("fooLong"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + }, + }, + }, + }, // multiple puts, one watcher with matching key { []string{"foo", "foo", "foo"},