From 58aa3483c30af2c6e359d2567aef40cbd31e9745 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 16 Jul 2016 21:14:30 -0700 Subject: [PATCH] grpcproxy: add filter to watcher --- etcdserver/api/v3rpc/watch.go | 25 +++++++++++++++---------- proxy/grpcproxy/watch.go | 2 ++ proxy/grpcproxy/watcher.go | 33 ++++++++++++++++++++++++++++----- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index ffe722e72..0630a7480 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -172,16 +172,7 @@ func (sws *serverWatchStream) recvLoop() error { // support >= key queries creq.RangeEnd = []byte{} } - filters := make([]mvcc.FilterFunc, 0, len(creq.Filters)) - for _, ft := range creq.Filters { - switch ft { - case pb.WatchCreateRequest_NOPUT: - filters = append(filters, filterNoPut) - case pb.WatchCreateRequest_NODELETE: - filters = append(filters, filterNoDelete) - default: - } - } + filters := FiltersFromRequest(creq) wsrev := sws.watchStream.Rev() rev := creq.StartRevision @@ -372,3 +363,17 @@ func filterNoDelete(e mvccpb.Event) bool { func filterNoPut(e mvccpb.Event) bool { return e.Type == mvccpb.PUT } + +func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc { + filters := make([]mvcc.FilterFunc, 0, len(creq.Filters)) + for _, ft := range creq.Filters { + switch ft { + case pb.WatchCreateRequest_NOPUT: + filters = append(filters, filterNoPut) + case pb.WatchCreateRequest_NODELETE: + filters = append(filters, filterNoDelete) + default: + } + } + return filters +} diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 42c3fdf1a..f1dcc8a8d 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -21,6 +21,7 @@ import ( "golang.org/x/net/context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) @@ -104,6 +105,7 @@ func (sws *serverWatchStream) recvLoop() error { ch: sws.watchCh, progress: cr.ProgressNotify, + filters: v3rpc.FiltersFromRequest(cr), } if cr.StartRevision != 0 { sws.addDedicatedWatcher(watcher, cr.StartRevision) diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 4049f11d2..a6e5fc4b2 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -17,6 +17,7 @@ package grpcproxy import ( "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/mvccpb" ) @@ -27,7 +28,8 @@ type watchRange struct { type watcher struct { id int64 wr watchRange - // TODO: support filter + + filters []mvcc.FilterFunc progress bool ch chan<- *pb.WatchResponse } @@ -39,11 +41,32 @@ func (w *watcher) send(wr clientv3.WatchResponse) { // todo: filter out the events that this watcher already seen. - evs := wr.Events - events := make([]*mvccpb.Event, len(evs)) - for i := range evs { - events[i] = (*mvccpb.Event)(evs[i]) + events := make([]*mvccpb.Event, 0, len(wr.Events)) + + for i := range wr.Events { + filtered := false + + ev := (*mvccpb.Event)(wr.Events[i]) + + if len(w.filters) != 0 { + for _, filter := range w.filters { + if filter(*ev) { + filtered = true + break + } + } + } + + if !filtered { + events = append(events, ev) + } } + + // all events are filtered out? + if !wr.IsProgressNotify() && len(events) == 0 { + return + } + pbwr := &pb.WatchResponse{ Header: &wr.Header, WatchId: w.id,