grpcproxy: add SetHeader support to ServerStream

Fixes #6726
This commit is contained in:
Anthony Romano 2016-10-31 17:00:15 -07:00
parent 94c804b81a
commit 7ef17d3e97

View File

@ -15,12 +15,16 @@
package grpcproxy
import (
"errors"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
var errAlreadySentHeader = errors.New("grpcproxy: already send header")
type ws2wc struct{ wserv pb.WatchServer }
func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
@ -31,7 +35,7 @@ func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_Wa
ch1, ch2 := make(chan interface{}), make(chan interface{})
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}, nil}}
go func() {
s.wserv.Watch(wserver)
// close the server side sender
@ -73,17 +77,38 @@ type chanServerStream struct {
headerc chan<- metadata.MD
trailerc chan<- metadata.MD
grpc.Stream
headers []metadata.MD
}
func (ss *chanServerStream) SendHeader(md metadata.MD) error {
if ss.headerc == nil {
return errAlreadySentHeader
}
outmd := make(map[string][]string)
for _, h := range append(ss.headers, md) {
for k, v := range h {
outmd[k] = v
}
}
select {
case ss.headerc <- md:
case ss.headerc <- outmd:
ss.headerc = nil
ss.headers = nil
return nil
case <-ss.Context().Done():
}
return ss.Context().Err()
}
func (ss *chanServerStream) SetHeader(md metadata.MD) error {
if ss.headerc == nil {
return errAlreadySentHeader
}
ss.headers = append(ss.headers, md)
return nil
}
func (ss *chanServerStream) SetTrailer(md metadata.MD) {
ss.trailerc <- md
}