mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #7605 from gyuho/wrap-adapter
proxy/grpcproxy: add chanStream helper
This commit is contained in:
commit
4c7ffe4442
@ -136,3 +136,30 @@ func (s *chanStream) RecvMsg(m interface{}) error {
|
||||
}
|
||||
return s.ctx.Err()
|
||||
}
|
||||
|
||||
func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream {
|
||||
// ch1 is buffered so server can send error on close
|
||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
|
||||
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
|
||||
|
||||
cctx, ccancel := context.WithCancel(ctx)
|
||||
cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
|
||||
cs := chanClientStream{headerc, trailerc, cli}
|
||||
|
||||
sctx, scancel := context.WithCancel(ctx)
|
||||
srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
|
||||
ss := chanServerStream{headerc, trailerc, srv, nil}
|
||||
|
||||
go func() {
|
||||
if err := ssHandler(ss); err != nil {
|
||||
select {
|
||||
case srv.sendc <- err:
|
||||
case <-sctx.Done():
|
||||
case <-cctx.Done():
|
||||
}
|
||||
}
|
||||
scancel()
|
||||
ccancel()
|
||||
}()
|
||||
return cs
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type ls2lc struct {
|
||||
@ -39,29 +38,10 @@ func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts
|
||||
}
|
||||
|
||||
func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
|
||||
// ch1 is buffered so server can send error on close
|
||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
|
||||
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
|
||||
|
||||
cctx, ccancel := context.WithCancel(ctx)
|
||||
cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
|
||||
lclient := &ls2lcClientStream{chanClientStream{headerc, trailerc, cli}}
|
||||
|
||||
sctx, scancel := context.WithCancel(ctx)
|
||||
srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
|
||||
lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
|
||||
go func() {
|
||||
if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil {
|
||||
select {
|
||||
case srv.sendc <- err:
|
||||
case <-sctx.Done():
|
||||
case <-cctx.Done():
|
||||
}
|
||||
}
|
||||
scancel()
|
||||
ccancel()
|
||||
}()
|
||||
return lclient, nil
|
||||
cs := newPipeStream(ctx, func(ss chanServerStream) error {
|
||||
return c.leaseServer.LeaseKeepAlive(&ls2lcServerStream{ss})
|
||||
})
|
||||
return &ls2lcClientStream{cs}, nil
|
||||
}
|
||||
|
||||
func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
var errAlreadySentHeader = errors.New("adapter: already sent header")
|
||||
@ -32,29 +31,10 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
|
||||
}
|
||||
|
||||
func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
|
||||
// ch1 is buffered so server can send error on close
|
||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
|
||||
headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
|
||||
|
||||
cctx, ccancel := context.WithCancel(ctx)
|
||||
cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
|
||||
wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}}
|
||||
|
||||
sctx, scancel := context.WithCancel(ctx)
|
||||
srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
|
||||
wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
|
||||
go func() {
|
||||
if err := s.wserv.Watch(wserver); err != nil {
|
||||
select {
|
||||
case srv.sendc <- err:
|
||||
case <-sctx.Done():
|
||||
case <-cctx.Done():
|
||||
}
|
||||
}
|
||||
scancel()
|
||||
ccancel()
|
||||
}()
|
||||
return wclient, nil
|
||||
cs := newPipeStream(ctx, func(ss chanServerStream) error {
|
||||
return s.wserv.Watch(&ws2wcServerStream{ss})
|
||||
})
|
||||
return &ws2wcClientStream{cs}, nil
|
||||
}
|
||||
|
||||
// ws2wcClientStream implements Watch_WatchClient
|
||||
|
Loading…
x
Reference in New Issue
Block a user