diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 117b1f0fe..bee12e385 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -19,6 +19,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" ) type watchServer struct { @@ -61,15 +62,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) { for { select { - case e, ok := <-watcher.Chan(): + case evs, ok := <-watcher.Chan(): if !ok { return } - err := stream.Send(&pb.WatchResponse{Event: &e}) + + // TODO: evs is []storagepb.Event type + // either return []*storagepb.Event from storage package + // or define protocol buffer with []storagepb.Event. + events := make([]*storagepb.Event, len(evs)) + for i := range evs { + events[i] = &evs[i] + } + + err := stream.Send(&pb.WatchResponse{Events: events}) storage.ReportEventReceived() if err != nil { return } + case <-closec: // drain the chan to clean up pending events for {