api/v3rpc: add watch ID to "watchStream.Watch"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2017-12-22 14:15:54 -08:00
parent 82a164e3b9
commit 33c732b97c

View File

@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error {
if !sws.isWatchPermitted(creq) { if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()), Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, WatchId: creq.WatchId,
Canceled: true, Canceled: true,
Created: true, Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(), CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error {
if rev == 0 { if rev == 0 {
rev = wsrev + 1 rev = wsrev + 1
} }
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...) id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 { if err == nil {
sws.mu.Lock() sws.mu.Lock()
if creq.ProgressNotify { if creq.ProgressNotify {
sws.progress[id] = true sws.progress[id] = true
@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error {
Header: sws.newResponseHeader(wsrev), Header: sws.newResponseHeader(wsrev),
WatchId: int64(id), WatchId: int64(id),
Created: true, Created: true,
Canceled: id == -1, Canceled: err != nil,
}
if err != nil {
wr.CancelReason = err.Error()
} }
select { select {
case sws.ctrlStream <- wr: case sws.ctrlStream <- wr: