v3api: add support for sending watcher control response

This commit is contained in:
Xiang Li 2016-01-02 20:54:37 -08:00
parent 34187a4fbe
commit ec12686233
2 changed files with 70 additions and 16 deletions

View File

@ -98,9 +98,15 @@ func recvLoop(wStream pb.Watch_WatchClient) {
if err != nil {
ExitWithError(ExitError, err)
}
evs := resp.Events
for _, ev := range evs {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
switch {
// TODO: handle canceled/compacted and other control response types
case resp.Created:
fmt.Printf("watcher created: id %08x\n", resp.WatchId)
default:
for _, ev := range resp.Events {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
}
}
}

View File

@ -30,17 +30,45 @@ func NewWatchServer(w storage.Watchable) pb.WatchServer {
return &watchServer{w}
}
const (
// We send ctrl response inside the read loop. We do not want
// send to block read, but we still want ctrl response we sent to
// be serialized. Thus we use a buffered chan to solve the problem.
// A small buffer should be OK for most cases, since we expect the
// ctrl requests are infrequent.
ctrlStreamBufLen = 16
)
// serverWatchStream is an etcd server side stream. It receives requests
// from client side gRPC stream. It receives watch events from storage.WatchStream,
// and creates responses that forwarded to gRPC stream.
// It also forwards control message like watch created and canceled.
type serverWatchStream struct {
gRPCStream pb.Watch_WatchServer
watchStream storage.WatchStream
ctrlStream chan *pb.WatchResponse
// closec indicates the stream is closed.
closec chan struct{}
}
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
closec := make(chan struct{})
defer close(closec)
sws := serverWatchStream{
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
closec: make(chan struct{}),
}
defer sws.close()
watchStream := ws.watchable.NewWatchStream()
defer watchStream.Close()
go sendLoop(stream, watchStream, closec)
go sws.sendLoop()
return sws.recvLoop()
}
func (sws *serverWatchStream) recvLoop() error {
for {
req, err := stream.Recv()
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
@ -57,7 +85,12 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
toWatch = creq.Prefix
prefix = true
}
watchStream.Watch(toWatch, prefix, creq.StartRevision)
id, _ := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
WatchId: id,
Created: true,
}
default:
// TODO: support cancellation
panic("not implemented")
@ -65,10 +98,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
}
}
func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, closec chan struct{}) {
func (sws *serverWatchStream) sendLoop() {
for {
select {
case evs, ok := <-watchStream.Chan():
case evs, ok := <-sws.watchStream.Chan():
if !ok {
return
}
@ -81,16 +114,25 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos
events[i] = &evs[i]
}
err := stream.Send(&pb.WatchResponse{Events: events})
err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events})
storage.ReportEventReceived()
if err != nil {
return
}
case <-closec:
case c, ok := <-sws.ctrlStream:
if !ok {
return
}
if err := sws.gRPCStream.Send(c); err != nil {
return
}
case <-sws.closec:
// drain the chan to clean up pending events
for {
_, ok := <-watchStream.Chan()
_, ok := <-sws.watchStream.Chan()
if !ok {
return
}
@ -99,3 +141,9 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos
}
}
}
func (sws *serverWatchStream) close() {
sws.watchStream.Close()
close(sws.closec)
close(sws.ctrlStream)
}