clientv3: combine Watch, WatchPrefix with variadic

For https://github.com/coreos/etcd/issues/4598.
This commit is contained in:
Gyu-Ho Lee 2016-02-23 17:47:56 -08:00
parent 53f94c22b3
commit a24d276891
6 changed files with 93 additions and 56 deletions

View File

@ -35,7 +35,7 @@ func ExampleWatcher_watch() {
wc := clientv3.NewWatcher(cli) wc := clientv3.NewWatcher(cli)
defer wc.Close() defer wc.Close()
rch := wc.Watch(context.Background(), "foo", 0) rch := wc.Watch(context.Background(), "foo")
for wresp := range rch { for wresp := range rch {
for _, ev := range wresp.Events { for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() {
wc := clientv3.NewWatcher(cli) wc := clientv3.NewWatcher(cli)
defer wc.Close() defer wc.Close()
rch := wc.WatchPrefix(context.Background(), "foo", 0) rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch { for wresp := range rch {
for _, ev := range wresp.Events { for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

View File

@ -350,7 +350,7 @@ func TestKVCompact(t *testing.T) {
wc := clientv3.NewWatcher(clus.RandClient()) wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close() defer wc.Close()
wchan := wc.Watch(ctx, "foo", 3) wchan := wc.Watch(ctx, "foo", clientv3.WithRev(3))
if wr := <-wchan; wr.CompactRevision != 7 { if wr := <-wchan; wr.CompactRevision != 7 {
t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision) t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision)

View File

@ -73,7 +73,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
for _, k := range keys { for _, k := range keys {
// key watcher // key watcher
go func(key string) { go func(key string) {
ch := wctx.w.Watch(context.TODO(), key, 0) ch := wctx.w.Watch(context.TODO(), key)
if ch == nil { if ch == nil {
t.Fatalf("expected watcher channel, got nil") t.Fatalf("expected watcher channel, got nil")
} }
@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
} }
// prefix watcher on "b" (bar and baz) // prefix watcher on "b" (bar and baz)
go func() { go func() {
prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0) prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
if prefixc == nil { if prefixc == nil {
t.Fatalf("expected watcher channel, got nil") t.Fatalf("expected watcher channel, got nil")
} }
@ -181,7 +181,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
} }
}() }()
// should reconnect when requesting watch // should reconnect when requesting watch
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel") t.Fatalf("expected non-nil channel")
} }
@ -200,7 +200,7 @@ func TestWatchReconnInit(t *testing.T) {
} }
func testWatchReconnInit(t *testing.T, wctx *watchctx) { func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel") t.Fatalf("expected non-nil channel")
} }
// take down watcher connection // take down watcher connection
@ -216,7 +216,7 @@ func TestWatchReconnRunning(t *testing.T) {
} }
func testWatchReconnRunning(t *testing.T, wctx *watchctx) { func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel") t.Fatalf("expected non-nil channel")
} }
putAndWatch(t, wctx, "a", "a") putAndWatch(t, wctx, "a", "a")
@ -233,7 +233,7 @@ func TestWatchCancelInit(t *testing.T) {
func testWatchCancelInit(t *testing.T, wctx *watchctx) { func testWatchCancelInit(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel") t.Fatalf("expected non-nil watcher channel")
} }
cancel() cancel()
@ -254,7 +254,7 @@ func TestWatchCancelRunning(t *testing.T) {
func testWatchCancelRunning(t *testing.T, wctx *watchctx) { func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel") t.Fatalf("expected non-nil watcher channel")
} }
if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil { if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {

View File

@ -15,6 +15,8 @@
package clientv3 package clientv3
import ( import (
"reflect"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease"
) )
@ -36,10 +38,12 @@ type Op struct {
// for range // for range
limit int64 limit int64
rev int64
sort *SortOption sort *SortOption
serializable bool serializable bool
// for range, watch
rev int64
// for put // for put
val []byte val []byte
leaseID lease.LeaseID leaseID lease.LeaseID
@ -65,6 +69,27 @@ func (op Op) toRequestUnion() *pb.RequestUnion {
} }
} }
func (op Op) toWatchRequest() *watchRequest {
switch op.t {
case tRange:
key := string(op.key)
prefix := ""
if op.end != nil {
prefix = key
key = ""
}
wr := &watchRequest{
key: key,
prefix: prefix,
rev: op.rev,
}
return wr
default:
panic("Only for tRange")
}
}
func (op Op) isWrite() bool { func (op Op) isWrite() bool {
return op.t != tRange return op.t != tRange
} }
@ -111,6 +136,24 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret return ret
} }
func opWatch(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
switch {
case ret.end != nil && !reflect.DeepEqual(ret.end, getPrefix(ret.key)):
panic("only supports single keys or prefixes")
case ret.leaseID != 0:
panic("unexpected lease in watch")
case ret.limit != 0:
panic("unexpected limit in watch")
case ret.sort != nil:
panic("unexpected sort in watch")
case ret.serializable != false:
panic("unexpected serializable in watch")
}
return ret
}
func (op *Op) applyOpts(opts []OpOption) { func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts { for _, opt := range opts {
opt(op) opt(op)
@ -129,8 +172,7 @@ func WithLease(leaseID lease.LeaseID) OpOption {
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } } func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }
// WithRev specifies the store revision for 'Get' request. // WithRev specifies the store revision for 'Get' request.
// // Or the start revision of 'Watch' request.
// TODO: support Watch API
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
// WithSort specifies the ordering in 'Get' request. It requires // WithSort specifies the ordering in 'Get' request. It requires
@ -143,25 +185,28 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
} }
} }
// WithPrefix enables 'Get' or 'Delete' requests to operate on the func getPrefix(key []byte) []byte {
// keys with matching prefix. For example, 'Get(foo, WithPrefix())' end := make([]byte, len(key))
// can return 'foo1', 'foo2', and so on. copy(end, key)
// for i := len(end) - 1; i >= 0; i-- {
// TODO: support Watch API if end[i] < 0xff {
func WithPrefix() OpOption { end[i] = end[i] + 1
return func(op *Op) { end = end[:i+1]
op.end = make([]byte, len(op.key)) return end
copy(op.end, op.key)
for i := len(op.end) - 1; i >= 0; i-- {
if op.end[i] < 0xff {
op.end[i] = op.end[i] + 1
op.end = op.end[:i+1]
return
} }
} }
// next prefix does not exist (e.g., 0xffff); // next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy // default to WithFromKey policy
op.end = []byte{0} end = []byte{0}
return end
}
// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
return func(op *Op) {
op.end = getPrefix(op.key)
} }
} }

View File

@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
// get all events since revision (or get non-compacted revision, if // get all events since revision (or get non-compacted revision, if
// rev is too far behind) // rev is too far behind)
wch := wapi.WatchPrefix(ctx, s.prefix, s.rev) wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
for wr := range wch { for wr := range wch {
respchan <- wr respchan <- wr
} }

View File

@ -27,17 +27,11 @@ import (
type WatchChan <-chan WatchResponse type WatchChan <-chan WatchResponse
type Watcher interface { type Watcher interface {
// Watch watches on a single key. The watched events will be returned // Watch watches on a key or prefix. The watched events will be returned
// through the returned channel. // through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request // If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed. // might be canceled from the server-side and the chan will be closed.
Watch(ctx context.Context, key string, rev int64) WatchChan Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// WatchPrefix watches on a prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan
// Close closes the watcher and cancels all watch requests. // Close closes the watcher and cancels all watch requests.
Close() error Close() error
@ -127,27 +121,16 @@ func NewWatcher(c *Client) Watcher {
return w return w
} }
func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan { // Watch posts a watch request to run() and waits for a new watcher channel
return w.watch(ctx, key, "", rev) func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
} ow := opWatch(key, opts...)
func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan { wr := ow.toWatchRequest()
return w.watch(ctx, "", prefix, rev) wr.ctx = ctx
}
func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}
// watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan {
retc := make(chan chan WatchResponse, 1) retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc} wr.retc = retc
// submit request // submit request
select { select {
case w.reqc <- wr: case w.reqc <- wr:
@ -167,6 +150,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc
} }
} }
func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}
func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil { if pendingReq == nil {
// no pending request; ignore // no pending request; ignore