v3rpc: implement 'prev-kv' watch

This commit is contained in:
Gyu-Ho Lee 2016-10-07 14:22:19 -07:00
parent 6f89fbf8b5
commit 2862c4fa12

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,6 +82,8 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
@ -91,6 +93,7 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send
// progress to.
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// closec indicates the stream is closed.
closec chan struct{}
@ -101,14 +104,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
@ -170,10 +177,17 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify {
sws.mu.Lock()
sws.progress[id] = true
sws.mu.Unlock()
if id != -1 {
if creq.ProgressNotify {
sws.mu.Lock()
sws.progress[id] = true
sws.mu.Unlock()
}
if creq.PrevKv {
sws.mu.Lock()
sws.prevKV[id] = true
sws.mu.Unlock()
}
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
@ -198,6 +212,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
@ -244,8 +259,17 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
needPrevKV := sws.prevKV[wresp.WatchID]
for i := range evs {
events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
wr := &pb.WatchResponse{