From ec12686233e299ff2c6b8289fe608e7d4e920000 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 2 Jan 2016 20:54:37 -0800 Subject: [PATCH] v3api: add support for sending watcher control response --- etcdctlv3/command/watch_command.go | 12 +++-- etcdserver/api/v3rpc/watch.go | 74 ++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index 31c663b4f..e5290c32d 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -98,9 +98,15 @@ func recvLoop(wStream pb.Watch_WatchClient) { if err != nil { ExitWithError(ExitError, err) } - evs := resp.Events - for _, ev := range evs { - fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value)) + + switch { + // TODO: handle canceled/compacted and other control response types + case resp.Created: + fmt.Printf("watcher created: id %08x\n", resp.WatchId) + default: + for _, ev := range resp.Events { + fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value)) + } } } } diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index f369838eb..db8c8ab9d 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -30,17 +30,45 @@ func NewWatchServer(w storage.Watchable) pb.WatchServer { return &watchServer{w} } +const ( + // We send ctrl response inside the read loop. We do not want + // send to block read, but we still want ctrl response we sent to + // be serialized. Thus we use a buffered chan to solve the problem. + // A small buffer should be OK for most cases, since we expect the + // ctrl requests are infrequent. + ctrlStreamBufLen = 16 +) + +// serverWatchStream is an etcd server side stream. It receives requests +// from client side gRPC stream. It receives watch events from storage.WatchStream, +// and creates responses that forwarded to gRPC stream. +// It also forwards control message like watch created and canceled. +type serverWatchStream struct { + gRPCStream pb.Watch_WatchServer + watchStream storage.WatchStream + ctrlStream chan *pb.WatchResponse + + // closec indicates the stream is closed. + closec chan struct{} +} + func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { - closec := make(chan struct{}) - defer close(closec) + sws := serverWatchStream{ + gRPCStream: stream, + watchStream: ws.watchable.NewWatchStream(), + // chan for sending control response like watcher created and canceled. + ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), + closec: make(chan struct{}), + } + defer sws.close() - watchStream := ws.watchable.NewWatchStream() - defer watchStream.Close() - - go sendLoop(stream, watchStream, closec) + go sws.sendLoop() + return sws.recvLoop() +} +func (sws *serverWatchStream) recvLoop() error { for { - req, err := stream.Recv() + req, err := sws.gRPCStream.Recv() if err == io.EOF { return nil } @@ -57,7 +85,12 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { toWatch = creq.Prefix prefix = true } - watchStream.Watch(toWatch, prefix, creq.StartRevision) + id, _ := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision) + sws.ctrlStream <- &pb.WatchResponse{ + // TODO: fill in response header. + WatchId: id, + Created: true, + } default: // TODO: support cancellation panic("not implemented") @@ -65,10 +98,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { } } -func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, closec chan struct{}) { +func (sws *serverWatchStream) sendLoop() { for { select { - case evs, ok := <-watchStream.Chan(): + case evs, ok := <-sws.watchStream.Chan(): if !ok { return } @@ -81,16 +114,25 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos events[i] = &evs[i] } - err := stream.Send(&pb.WatchResponse{Events: events}) + err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events}) storage.ReportEventReceived() if err != nil { return } - case <-closec: + case c, ok := <-sws.ctrlStream: + if !ok { + return + } + + if err := sws.gRPCStream.Send(c); err != nil { + return + } + + case <-sws.closec: // drain the chan to clean up pending events for { - _, ok := <-watchStream.Chan() + _, ok := <-sws.watchStream.Chan() if !ok { return } @@ -99,3 +141,9 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos } } } + +func (sws *serverWatchStream) close() { + sws.watchStream.Close() + close(sws.closec) + close(sws.ctrlStream) +}