From 59e7be4a2aa5be42f31d7cf563833c2298ec53aa Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 16 Feb 2016 17:45:10 -0800 Subject: [PATCH] v3api: send watch events only after sending watchid creation If events show up before the watch id, the client won't be able to match the event with the requested watcher. --- etcdserver/api/v3rpc/watch.go | 47 +++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 408590993..fd16d44bc 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -129,6 +129,11 @@ func (sws *serverWatchStream) recvLoop() error { } func (sws *serverWatchStream) sendLoop() { + // watch ids that are currently active + ids := make(map[storage.WatchID]struct{}) + // watch responses pending on a watch id creation message + pending := make(map[storage.WatchID][]*pb.WatchResponse) + for { select { case wresp, ok := <-sws.watchStream.Chan(): @@ -145,14 +150,22 @@ func (sws *serverWatchStream) sendLoop() { events[i] = &evs[i] } - err := sws.gRPCStream.Send(&pb.WatchResponse{ + wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wresp.Revision), WatchId: int64(wresp.WatchID), Events: events, CompactRevision: wresp.CompactRevision, - }) + } + + if _, hasId := ids[wresp.WatchID]; !hasId { + // buffer if id not yet announced + wrs := append(pending[wresp.WatchID], wr) + pending[wresp.WatchID] = wrs + continue + } + storage.ReportEventReceived() - if err != nil { + if err := sws.gRPCStream.Send(wr); err != nil { return } @@ -165,15 +178,33 @@ func (sws *serverWatchStream) sendLoop() { return } + // track id creation + wid := storage.WatchID(c.WatchId) + if c.Canceled { + delete(ids, wid) + continue + } + if c.Created { + // flush buffered events + ids[wid] = struct{}{} + for _, v := range pending[wid] { + storage.ReportEventReceived() + if err := sws.gRPCStream.Send(v); err != nil { + return + } + } + delete(pending, wid) + } case <-sws.closec: // drain the chan to clean up pending events - for { - _, ok := <-sws.watchStream.Chan() - if !ok { - return - } + for range sws.watchStream.Chan() { storage.ReportEventReceived() } + for _, wrs := range pending { + for range wrs { + storage.ReportEventReceived() + } + } } } }