From 9ef0f5ef8a9af7b1aacc8990a644746c46c6c1aa Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 1 Sep 2016 09:30:43 -0700 Subject: [PATCH] grpcproxy: fix stream closing issue --- proxy/grpcproxy/watch_client_adapter.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/proxy/grpcproxy/watch_client_adapter.go b/proxy/grpcproxy/watch_client_adapter.go index bf21142ae..8e3a6ed75 100644 --- a/proxy/grpcproxy/watch_client_adapter.go +++ b/proxy/grpcproxy/watch_client_adapter.go @@ -32,7 +32,11 @@ func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_Wa headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}} wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}} - go s.wserv.Watch(wserver) + go func() { + s.wserv.Watch(wserver) + // close the server side sender + close(ch1) + }() return wclient, nil } @@ -88,7 +92,7 @@ func (ss *chanServerStream) SetTrailer(md metadata.MD) { type chanClientStream struct { headerc <-chan metadata.MD trailerc <-chan metadata.MD - grpc.Stream + *chanStream } func (cs *chanClientStream) Header() (metadata.MD, error) { @@ -109,7 +113,10 @@ func (cs *chanClientStream) Trailer() metadata.MD { } } -func (s *chanClientStream) CloseSend() error { return nil } +func (s *chanClientStream) CloseSend() error { + close(s.chanStream.sendc) + return nil +} // chanStream implements grpc.Stream using channels type chanStream struct { @@ -132,8 +139,11 @@ func (s *chanStream) SendMsg(m interface{}) error { func (s *chanStream) RecvMsg(m interface{}) error { v := m.(*interface{}) select { - case m = <-s.recvc: - *v = m + case msg, ok := <-s.recvc: + if !ok { + return grpc.ErrClientConnClosing + } + *v = msg return nil case <-s.ctx.Done(): }