mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6820 from gyuho/watcher
mvcc: return -1 for wrong watcher range key >= end
This commit is contained in:
commit
dbb692e50f
@ -223,7 +223,7 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
|
||||
// If order != SortNone, server fetches the entire key-space,
|
||||
// and then applies the sort and limit, if provided.
|
||||
// Since current mvcc.Range implementation returns results
|
||||
// sorted by keys in lexiographically ascending order,
|
||||
// sorted by keys in lexicographically ascending order,
|
||||
// client should ignore SortOrder if the target is SortByKey.
|
||||
order = SortNone
|
||||
}
|
||||
@ -261,14 +261,15 @@ func WithPrefix() OpOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithRange specifies the range of 'Get' or 'Delete' requests.
|
||||
// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
|
||||
// For example, 'Get' requests with 'WithRange(end)' returns
|
||||
// the keys in the range [key, end).
|
||||
// endKey must be lexicographically greater than start key.
|
||||
func WithRange(endKey string) OpOption {
|
||||
return func(op *Op) { op.end = []byte(endKey) }
|
||||
}
|
||||
|
||||
// WithFromKey specifies the range of 'Get' or 'Delete' requests
|
||||
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
|
||||
// to be equal or greater than the key in the argument.
|
||||
func WithFromKey() OpOption { return WithRange("\x00") }
|
||||
|
||||
|
@ -329,7 +329,7 @@ put key2 "some extra key"
|
||||
|
||||
### WATCH [options] [key or prefix] [range_end]
|
||||
|
||||
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user.
|
||||
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user. If range_end is given, it must be lexicographically greater than key or "\x00".
|
||||
|
||||
#### Options
|
||||
|
||||
|
@ -348,6 +348,51 @@ func TestV3WatchFutureRevision(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchWrongRange tests wrong range does not create watchers.
|
||||
func TestV3WatchWrongRange(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
wAPI := toGRPC(clus.RandClient()).Watch
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, err := wAPI.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
key []byte
|
||||
end []byte
|
||||
canceled bool
|
||||
}{
|
||||
{[]byte("a"), []byte("a"), true}, // wrong range end
|
||||
{[]byte("b"), []byte("a"), true}, // wrong range end
|
||||
{[]byte("foo"), []byte{0}, false}, // watch request with 'WithFromKey'
|
||||
}
|
||||
for i, tt := range tests {
|
||||
if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: tt.key, RangeEnd: tt.end, StartRevision: 1}}}); err != nil {
|
||||
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||
}
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wStream.Recv error: %v", i, err)
|
||||
}
|
||||
if !cresp.Created {
|
||||
t.Fatalf("#%d: create %v, want %v", i, cresp.Created, true)
|
||||
}
|
||||
if cresp.Canceled != tt.canceled {
|
||||
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
||||
}
|
||||
if tt.canceled && cresp.WatchId != -1 {
|
||||
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
|
||||
func TestV3WatchCancelSynced(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
@ -15,6 +15,7 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
@ -99,6 +100,12 @@ type watchStream struct {
|
||||
// Watch creates a new watcher in the stream and returns its WatchID.
|
||||
// TODO: return error if ws is closed?
|
||||
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
|
||||
// prevent wrong range where key >= end lexicographically
|
||||
// watch request with 'WithFromKey' has empty-byte range end
|
||||
if len(end) != 0 && bytes.Compare(key, end) != -1 {
|
||||
return -1
|
||||
}
|
||||
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
if ws.closed {
|
||||
|
@ -153,6 +153,28 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
|
||||
// does not create watcher, which panics when canceling in range tree.
|
||||
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
||||
if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 {
|
||||
t.Fatalf("key == end range given; id expected -1, got %d", id)
|
||||
}
|
||||
if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 {
|
||||
t.Fatalf("key > end range given; id expected -1, got %d", id)
|
||||
}
|
||||
// watch request with 'WithFromKey' has empty-byte range end
|
||||
if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 {
|
||||
t.Fatalf("\x00 is range given; id expected 0, got %d", id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchDeleteRange(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
|
Loading…
x
Reference in New Issue
Block a user