mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: support watch with filters
Now user can filter events with types. The API is also extensible. It might make sense for the proxy to filter out events based on more expensive/customized filter.
This commit is contained in:
@@ -164,12 +164,23 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
// support >= key queries
|
||||
creq.RangeEnd = []byte{}
|
||||
}
|
||||
filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
|
||||
for _, ft := range creq.Filters {
|
||||
switch ft {
|
||||
case pb.WatchCreateRequest_NOPUT:
|
||||
filters = append(filters, filterNoPut)
|
||||
case pb.WatchCreateRequest_NODELETE:
|
||||
filters = append(filters, filterNoDelete)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
wsrev := sws.watchStream.Rev()
|
||||
rev := creq.StartRevision
|
||||
if rev == 0 {
|
||||
rev = wsrev + 1
|
||||
}
|
||||
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
|
||||
if id != -1 && creq.ProgressNotify {
|
||||
sws.progress[id] = true
|
||||
}
|
||||
@@ -322,3 +333,11 @@ func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
|
||||
RaftTerm: sws.raftTimer.Term(),
|
||||
}
|
||||
}
|
||||
|
||||
func filterNoDelete(e mvccpb.Event) bool {
|
||||
return e.Type == mvccpb.DELETE
|
||||
}
|
||||
|
||||
func filterNoPut(e mvccpb.Event) bool {
|
||||
return e.Type == mvccpb.PUT
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user