diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8665b2b91..ffe722e72 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -88,15 +88,14 @@ type serverWatchStream struct { watchStream mvcc.WatchStream ctrlStream chan *pb.WatchResponse + // mu protects progress, prevKV + mu sync.Mutex // progress tracks the watchID that stream might need to send // progress to. // TOOD: combine progress and prevKV into a single struct? progress map[mvcc.WatchID]bool prevKV map[mvcc.WatchID]bool - // mu protects progress - mu sync.Mutex - // closec indicates the stream is closed. closec chan struct{} @@ -191,12 +190,14 @@ func (sws *serverWatchStream) recvLoop() error { } id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...) if id != -1 { + sws.mu.Lock() if creq.ProgressNotify { sws.progress[id] = true } if creq.PrevKv { sws.prevKV[id] = true } + sws.mu.Unlock() } wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wsrev), @@ -268,7 +269,9 @@ func (sws *serverWatchStream) sendLoop() { // or define protocol buffer with []mvccpb.Event. evs := wresp.Events events := make([]*mvccpb.Event, len(evs)) + sws.mu.Lock() needPrevKV := sws.prevKV[wresp.WatchID] + sws.mu.Unlock() for i := range evs { events[i] = &evs[i] @@ -333,12 +336,14 @@ func (sws *serverWatchStream) sendLoop() { delete(pending, wid) } case <-progressTicker.C: + sws.mu.Lock() for id, ok := range sws.progress { if ok { sws.watchStream.RequestProgress(id) } sws.progress[id] = true } + sws.mu.Unlock() case <-sws.closec: return }