diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index dba064679..a109314b9 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -32,7 +32,7 @@ type watchServer struct { clusterID int64 memberID int64 raftTimer etcdserver.RaftTimer - watchable mvcc.Watchable + watchable mvcc.WatchableKV } func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { @@ -82,6 +82,8 @@ type serverWatchStream struct { memberID int64 raftTimer etcdserver.RaftTimer + watchable mvcc.WatchableKV + gRPCStream pb.Watch_WatchServer watchStream mvcc.WatchStream ctrlStream chan *pb.WatchResponse @@ -91,6 +93,7 @@ type serverWatchStream struct { // progress tracks the watchID that stream might need to send // progress to. progress map[mvcc.WatchID]bool + prevKV map[mvcc.WatchID]bool // closec indicates the stream is closed. closec chan struct{} @@ -101,14 +104,18 @@ type serverWatchStream struct { func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { sws := serverWatchStream{ - clusterID: ws.clusterID, - memberID: ws.memberID, - raftTimer: ws.raftTimer, + clusterID: ws.clusterID, + memberID: ws.memberID, + raftTimer: ws.raftTimer, + + watchable: ws.watchable, + gRPCStream: stream, watchStream: ws.watchable.NewWatchStream(), // chan for sending control response like watcher created and canceled. ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), progress: make(map[mvcc.WatchID]bool), + prevKV: make(map[mvcc.WatchID]bool), closec: make(chan struct{}), } @@ -170,10 +177,17 @@ func (sws *serverWatchStream) recvLoop() error { rev = wsrev + 1 } id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) - if id != -1 && creq.ProgressNotify { - sws.mu.Lock() - sws.progress[id] = true - sws.mu.Unlock() + if id != -1 { + if creq.ProgressNotify { + sws.mu.Lock() + sws.progress[id] = true + sws.mu.Unlock() + } + if creq.PrevKv { + sws.mu.Lock() + sws.prevKV[id] = true + sws.mu.Unlock() + } } wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wsrev), @@ -198,6 +212,7 @@ func (sws *serverWatchStream) recvLoop() error { } sws.mu.Lock() delete(sws.progress, mvcc.WatchID(id)) + delete(sws.prevKV, mvcc.WatchID(id)) sws.mu.Unlock() } } @@ -244,8 +259,17 @@ func (sws *serverWatchStream) sendLoop() { // or define protocol buffer with []mvccpb.Event. evs := wresp.Events events := make([]*mvccpb.Event, len(evs)) + needPrevKV := sws.prevKV[wresp.WatchID] for i := range evs { events[i] = &evs[i] + + if needPrevKV { + opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1} + r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt) + if err == nil && len(r.KVs) != 0 { + events[i].PrevKv = &(r.KVs[0]) + } + } } wr := &pb.WatchResponse{