Merge pull request #5963 from xiang90/p_filter

grpcproxy: add filter to watcher
This commit is contained in:
Xiang Li
2016-07-18 14:56:10 -07:00
committed by GitHub
3 changed files with 45 additions and 15 deletions

View File

@@ -172,16 +172,7 @@ func (sws *serverWatchStream) recvLoop() error {
// support >= key queries
creq.RangeEnd = []byte{}
}
filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
for _, ft := range creq.Filters {
switch ft {
case pb.WatchCreateRequest_NOPUT:
filters = append(filters, filterNoPut)
case pb.WatchCreateRequest_NODELETE:
filters = append(filters, filterNoDelete)
default:
}
}
filters := FiltersFromRequest(creq)
wsrev := sws.watchStream.Rev()
rev := creq.StartRevision
@@ -372,3 +363,17 @@ func filterNoDelete(e mvccpb.Event) bool {
func filterNoPut(e mvccpb.Event) bool {
return e.Type == mvccpb.PUT
}
func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
for _, ft := range creq.Filters {
switch ft {
case pb.WatchCreateRequest_NOPUT:
filters = append(filters, filterNoPut)
case pb.WatchCreateRequest_NODELETE:
filters = append(filters, filterNoDelete)
default:
}
}
return filters
}

View File

@@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@@ -104,6 +105,7 @@ func (sws *serverWatchStream) recvLoop() error {
ch: sws.watchCh,
progress: cr.ProgressNotify,
filters: v3rpc.FiltersFromRequest(cr),
}
if cr.StartRevision != 0 {
sws.addDedicatedWatcher(watcher, cr.StartRevision)

View File

@@ -17,6 +17,7 @@ package grpcproxy
import (
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
)
@@ -27,7 +28,8 @@ type watchRange struct {
type watcher struct {
id int64
wr watchRange
// TODO: support filter
filters []mvcc.FilterFunc
progress bool
ch chan<- *pb.WatchResponse
}
@@ -39,11 +41,32 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
// todo: filter out the events that this watcher already seen.
evs := wr.Events
events := make([]*mvccpb.Event, len(evs))
for i := range evs {
events[i] = (*mvccpb.Event)(evs[i])
events := make([]*mvccpb.Event, 0, len(wr.Events))
for i := range wr.Events {
filtered := false
ev := (*mvccpb.Event)(wr.Events[i])
if len(w.filters) != 0 {
for _, filter := range w.filters {
if filter(*ev) {
filtered = true
break
}
}
}
if !filtered {
events = append(events, ev)
}
}
// all events are filtered out?
if !wr.IsProgressNotify() && len(events) == 0 {
return
}
pbwr := &pb.WatchResponse{
Header: &wr.Header,
WatchId: w.id,