From 33c732b97c16856efb3e8ff2b48a0ae139934d08 Mon Sep 17 00:00:00 2001
From: Gyuho Lee <gyuhox@gmail.com>
Date: Fri, 22 Dec 2017 14:15:54 -0800
Subject: [PATCH] api/v3rpc: add watch ID to "watchStream.Watch"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
---
 etcdserver/api/v3rpc/watch.go | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go
index e29c4210b..7c79dace2 100644
--- a/etcdserver/api/v3rpc/watch.go
+++ b/etcdserver/api/v3rpc/watch.go
@@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error {
 			if !sws.isWatchPermitted(creq) {
 				wr := &pb.WatchResponse{
 					Header:       sws.newResponseHeader(sws.watchStream.Rev()),
-					WatchId:      -1,
+					WatchId:      creq.WatchId,
 					Canceled:     true,
 					Created:      true,
 					CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
@@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error {
 			if rev == 0 {
 				rev = wsrev + 1
 			}
-			id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
-			if id != -1 {
+			id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
+			if err == nil {
 				sws.mu.Lock()
 				if creq.ProgressNotify {
 					sws.progress[id] = true
@@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error {
 				Header:   sws.newResponseHeader(wsrev),
 				WatchId:  int64(id),
 				Created:  true,
-				Canceled: id == -1,
+				Canceled: err != nil,
+			}
+			if err != nil {
+				wr.CancelReason = err.Error()
 			}
 			select {
 			case sws.ctrlStream <- wr: