From 943fe70178bd5e73a2fe19ff84b8467f21e60c05 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 27 Jul 2016 21:24:52 -0700 Subject: [PATCH 1/2] clientv3: support watch filters --- clientv3/op.go | 17 +++++++++++++++++ clientv3/watch.go | 13 +++++++++++++ 2 files changed, 30 insertions(+) diff --git a/clientv3/op.go b/clientv3/op.go index 2fc803cbb..1a176b345 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -50,6 +50,9 @@ type Op struct { // progressNotify is for progress updates. progressNotify bool + // filters for watchers + filterPut bool + filterDelete bool // for put val []byte @@ -111,6 +114,8 @@ func OpDelete(key string, opts ...OpOption) Op { panic("unexpected serializable in delete") case ret.countOnly: panic("unexpected countOnly in delete") + case ret.filterDelete, ret.filterPut: + panic("unexpected filter in delete") } return ret } @@ -131,6 +136,8 @@ func OpPut(key, val string, opts ...OpOption) Op { panic("unexpected serializable in put") case ret.countOnly: panic("unexpected countOnly in put") + case ret.filterDelete, ret.filterPut: + panic("unexpected filter in put") } return ret } @@ -274,6 +281,16 @@ func WithProgressNotify() OpOption { } } +// WithFilterPut discards PUT events from the watcher. +func WithFilterPut() OpOption { + return func(op *Op) { op.filterPut = true } +} + +// WithFilterDelete discards DELETE events from the watcher. +func WithFilterDelete() OpOption { + return func(op *Op) { op.filterDelete = true } +} + // WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted, // nothing will be returned. func WithPrevKV() OpOption { diff --git a/clientv3/watch.go b/clientv3/watch.go index fbef36370..05f022c38 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -140,6 +140,8 @@ type watchRequest struct { rev int64 // progressNotify is for progress updates progressNotify bool + // filters is the list of events to filter out + filters []pb.WatchCreateRequest_FilterType // get the previous key-value pair before the event happens prevKV bool // retc receives a chan WatchResponse once the watcher is established @@ -210,12 +212,22 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch ow := opWatch(key, opts...) retc := make(chan chan WatchResponse, 1) + + var filters []pb.WatchCreateRequest_FilterType + if ow.filterPut { + filters = append(filters, pb.WatchCreateRequest_NOPUT) + } + if ow.filterDelete { + filters = append(filters, pb.WatchCreateRequest_NODELETE) + } + wr := &watchRequest{ ctx: ctx, key: string(ow.key), end: string(ow.end), rev: ow.rev, progressNotify: ow.progressNotify, + filters: filters, prevKV: ow.prevKV, retc: retc, } @@ -690,6 +702,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest { Key: []byte(wr.key), RangeEnd: []byte(wr.end), ProgressNotify: wr.progressNotify, + Filters: wr.filters, PrevKv: wr.prevKV, } cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} From 4c9a2a65c92928324feed2666ac9f843119cd526 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 27 Jul 2016 21:25:06 -0700 Subject: [PATCH 2/2] integration: test clientv3 watch filters --- clientv3/integration/watch_test.go | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index efd2ef65c..816c73fb7 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -673,3 +673,39 @@ func TestWatchWithRequireLeader(t *testing.T) { t.Fatalf("expected response, got closed channel") } } + +// TestWatchWithFilter checks that watch filtering works. +func TestWatchWithFilter(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + ctx := context.Background() + + wcNoPut := client.Watch(ctx, "a", clientv3.WithFilterPut()) + wcNoDel := client.Watch(ctx, "a", clientv3.WithFilterDelete()) + + if _, err := client.Put(ctx, "a", "abc"); err != nil { + t.Fatal(err) + } + if _, err := client.Delete(ctx, "a"); err != nil { + t.Fatal(err) + } + + npResp := <-wcNoPut + if len(npResp.Events) != 1 || npResp.Events[0].Type != clientv3.EventTypeDelete { + t.Fatalf("expected delete event, got %+v", npResp.Events) + } + ndResp := <-wcNoDel + if len(ndResp.Events) != 1 || ndResp.Events[0].Type != clientv3.EventTypePut { + t.Fatalf("expected put event, got %+v", ndResp.Events) + } + + select { + case resp := <-wcNoPut: + t.Fatalf("unexpected event on filtered put (%+v)", resp) + case resp := <-wcNoDel: + t.Fatalf("unexpected event on filtered delete (%+v)", resp) + case <-time.After(100 * time.Millisecond): + } +}