mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6331 from xiang90/fix_proxy
grpcproxy: fix stream closing issue
This commit is contained in:
commit
26999db927
@ -32,7 +32,11 @@ func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_Wa
|
||||
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
|
||||
wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
|
||||
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
|
||||
go s.wserv.Watch(wserver)
|
||||
go func() {
|
||||
s.wserv.Watch(wserver)
|
||||
// close the server side sender
|
||||
close(ch1)
|
||||
}()
|
||||
return wclient, nil
|
||||
}
|
||||
|
||||
@ -88,7 +92,7 @@ func (ss *chanServerStream) SetTrailer(md metadata.MD) {
|
||||
type chanClientStream struct {
|
||||
headerc <-chan metadata.MD
|
||||
trailerc <-chan metadata.MD
|
||||
grpc.Stream
|
||||
*chanStream
|
||||
}
|
||||
|
||||
func (cs *chanClientStream) Header() (metadata.MD, error) {
|
||||
@ -109,7 +113,10 @@ func (cs *chanClientStream) Trailer() metadata.MD {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chanClientStream) CloseSend() error { return nil }
|
||||
func (s *chanClientStream) CloseSend() error {
|
||||
close(s.chanStream.sendc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// chanStream implements grpc.Stream using channels
|
||||
type chanStream struct {
|
||||
@ -132,8 +139,11 @@ func (s *chanStream) SendMsg(m interface{}) error {
|
||||
func (s *chanStream) RecvMsg(m interface{}) error {
|
||||
v := m.(*interface{})
|
||||
select {
|
||||
case m = <-s.recvc:
|
||||
*v = m
|
||||
case msg, ok := <-s.recvc:
|
||||
if !ok {
|
||||
return grpc.ErrClientConnClosing
|
||||
}
|
||||
*v = msg
|
||||
return nil
|
||||
case <-s.ctx.Done():
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user