v3rpc: lock progress and prevKV map correctly

This commit is contained in:
Xiang Li 2016-07-07 15:01:05 -07:00 committed by Gyu-Ho Lee
parent b837feffe4
commit d50c487132

View File

@ -86,11 +86,11 @@ type serverWatchStream struct {
watchStream mvcc.WatchStream watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse ctrlStream chan *pb.WatchResponse
// mu protects progress, prevKV
mu sync.Mutex
// progress tracks the watchID that stream might need to send // progress tracks the watchID that stream might need to send
// progress to. // progress to.
progress map[mvcc.WatchID]bool progress map[mvcc.WatchID]bool
// mu protects progress
mu sync.Mutex
// closec indicates the stream is closed. // closec indicates the stream is closed.
closec chan struct{} closec chan struct{}
@ -171,7 +171,9 @@ func (sws *serverWatchStream) recvLoop() error {
} }
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify { if id != -1 && creq.ProgressNotify {
sws.mu.Lock()
sws.progress[id] = true sws.progress[id] = true
sws.mu.Unlock()
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev), Header: sws.newResponseHeader(wsrev),
@ -298,12 +300,14 @@ func (sws *serverWatchStream) sendLoop() {
delete(pending, wid) delete(pending, wid)
} }
case <-progressTicker.C: case <-progressTicker.C:
sws.mu.Lock()
for id, ok := range sws.progress { for id, ok := range sws.progress {
if ok { if ok {
sws.watchStream.RequestProgress(id) sws.watchStream.RequestProgress(id)
} }
sws.progress[id] = true sws.progress[id] = true
} }
sws.mu.Unlock()
case <-sws.closec: case <-sws.closec:
return return
} }