From caa73c176f1f998bfac9b736c9816f261d8fd0d6 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 24 Mar 2017 18:07:20 -0700 Subject: [PATCH] proxy/grpcproxy: add chanStream helper Prelimiary work for maintenance API in adapter Signed-off-by: Gyu-Ho Lee --- proxy/grpcproxy/adapter/chan_stream.go | 27 ++++++++++++++++++ .../grpcproxy/adapter/lease_client_adapter.go | 28 +++---------------- .../grpcproxy/adapter/watch_client_adapter.go | 28 +++---------------- 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/proxy/grpcproxy/adapter/chan_stream.go b/proxy/grpcproxy/adapter/chan_stream.go index 0b8509872..3aa01f205 100644 --- a/proxy/grpcproxy/adapter/chan_stream.go +++ b/proxy/grpcproxy/adapter/chan_stream.go @@ -136,3 +136,30 @@ func (s *chanStream) RecvMsg(m interface{}) error { } return s.ctx.Err() } + +func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream { + // ch1 is buffered so server can send error on close + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}) + headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) + + cctx, ccancel := context.WithCancel(ctx) + cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel} + cs := chanClientStream{headerc, trailerc, cli} + + sctx, scancel := context.WithCancel(ctx) + srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel} + ss := chanServerStream{headerc, trailerc, srv, nil} + + go func() { + if err := ssHandler(ss); err != nil { + select { + case srv.sendc <- err: + case <-sctx.Done(): + case <-cctx.Done(): + } + } + scancel() + ccancel() + }() + return cs +} diff --git a/proxy/grpcproxy/adapter/lease_client_adapter.go b/proxy/grpcproxy/adapter/lease_client_adapter.go index ea36a3d4d..d471fd914 100644 --- a/proxy/grpcproxy/adapter/lease_client_adapter.go +++ b/proxy/grpcproxy/adapter/lease_client_adapter.go @@ -19,7 +19,6 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) type ls2lc struct { @@ -39,29 +38,10 @@ func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts } func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) { - // ch1 is buffered so server can send error on close - ch1, ch2 := make(chan interface{}, 1), make(chan interface{}) - headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) - - cctx, ccancel := context.WithCancel(ctx) - cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel} - lclient := &ls2lcClientStream{chanClientStream{headerc, trailerc, cli}} - - sctx, scancel := context.WithCancel(ctx) - srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel} - lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}} - go func() { - if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil { - select { - case srv.sendc <- err: - case <-sctx.Done(): - case <-cctx.Done(): - } - } - scancel() - ccancel() - }() - return lclient, nil + cs := newPipeStream(ctx, func(ss chanServerStream) error { + return c.leaseServer.LeaseKeepAlive(&ls2lcServerStream{ss}) + }) + return &ls2lcClientStream{cs}, nil } func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) { diff --git a/proxy/grpcproxy/adapter/watch_client_adapter.go b/proxy/grpcproxy/adapter/watch_client_adapter.go index 5bb712091..af4a13c41 100644 --- a/proxy/grpcproxy/adapter/watch_client_adapter.go +++ b/proxy/grpcproxy/adapter/watch_client_adapter.go @@ -20,7 +20,6 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) var errAlreadySentHeader = errors.New("adapter: already sent header") @@ -32,29 +31,10 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient { } func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) { - // ch1 is buffered so server can send error on close - ch1, ch2 := make(chan interface{}, 1), make(chan interface{}) - headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) - - cctx, ccancel := context.WithCancel(ctx) - cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel} - wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}} - - sctx, scancel := context.WithCancel(ctx) - srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel} - wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}} - go func() { - if err := s.wserv.Watch(wserver); err != nil { - select { - case srv.sendc <- err: - case <-sctx.Done(): - case <-cctx.Done(): - } - } - scancel() - ccancel() - }() - return wclient, nil + cs := newPipeStream(ctx, func(ss chanServerStream) error { + return s.wserv.Watch(&ws2wcServerStream{ss}) + }) + return &ws2wcClientStream{cs}, nil } // ws2wcClientStream implements Watch_WatchClient