From ee8c577fc0c0ec3eb5ab8ddcfdec47e3f14af012 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 16 Aug 2016 12:09:16 -0700 Subject: [PATCH] vendor: update grpc --- cmd/Godeps/Godeps.json | 38 +-- cmd/vendor/google.golang.org/grpc/.travis.yml | 16 +- cmd/vendor/google.golang.org/grpc/call.go | 25 +- .../google.golang.org/grpc/clientconn.go | 302 ++++++++++++------ .../grpc/credentials/credentials.go | 53 ++- .../grpc/credentials/credentials_util_go17.go | 76 +++++ .../credentials/credentials_util_pre_go17.go | 74 +++++ .../grpc/metadata/metadata.go | 14 +- cmd/vendor/google.golang.org/grpc/rpc_util.go | 18 +- cmd/vendor/google.golang.org/grpc/server.go | 95 ++++-- cmd/vendor/google.golang.org/grpc/stream.go | 111 ++++--- .../grpc/transport/control.go | 5 + .../google.golang.org/grpc/transport/go16.go | 46 +++ .../google.golang.org/grpc/transport/go17.go | 46 +++ .../grpc/transport/handler_server.go | 8 +- .../grpc/transport/http2_client.go | 208 ++++++++---- .../grpc/transport/http2_server.go | 76 +++-- .../grpc/transport/http_util.go | 83 ++++- .../grpc/transport/pre_go16.go | 51 +++ .../grpc/transport/transport.go | 93 +++++- 20 files changed, 1097 insertions(+), 341 deletions(-) create mode 100644 cmd/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go create mode 100644 cmd/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go create mode 100644 cmd/vendor/google.golang.org/grpc/transport/go16.go create mode 100644 cmd/vendor/google.golang.org/grpc/transport/go17.go create mode 100644 cmd/vendor/google.golang.org/grpc/transport/pre_go16.go diff --git a/cmd/Godeps/Godeps.json b/cmd/Godeps/Godeps.json index f0048efe3..c8a72762b 100644 --- a/cmd/Godeps/Godeps.json +++ b/cmd/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/coreos/etcd", - "GoVersion": "devel-a6dbfc1", + "GoVersion": "go1.7", "GodepVersion": "v74", "Packages": [ "./..." @@ -237,48 +237,48 @@ }, { "ImportPath": "google.golang.org/grpc", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/codes", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/credentials", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/grpclog", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/internal", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/metadata", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/naming", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/peer", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "google.golang.org/grpc/transport", - "Comment": "v1.0.0-6-g02fca89", - "Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03" + "Comment": "v1.0.0-174-gc278196", + "Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f" }, { "ImportPath": "gopkg.in/cheggaaa/pb.v1", diff --git a/cmd/vendor/google.golang.org/grpc/.travis.yml b/cmd/vendor/google.golang.org/grpc/.travis.yml index 9bc2c1279..d7108cd64 100644 --- a/cmd/vendor/google.golang.org/grpc/.travis.yml +++ b/cmd/vendor/google.golang.org/grpc/.travis.yml @@ -1,17 +1,21 @@ language: go go: - - 1.5.3 - - 1.6 + - 1.5.4 + - 1.6.3 + +go_import_path: google.golang.org/grpc before_install: + - go get golang.org/x/tools/cmd/goimports + - go get github.com/golang/lint/golint - go get github.com/axw/gocov/gocov - go get github.com/mattn/goveralls - go get golang.org/x/tools/cmd/cover -install: - - mkdir -p "$GOPATH/src/google.golang.org" - - mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc" - script: + - '! gofmt -s -d -l . 2>&1 | read' + - '! goimports -l . | read' + - '! golint ./... | grep -vE "(_string|\.pb)\.go:"' + - '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf"' - make test testrace diff --git a/cmd/vendor/google.golang.org/grpc/call.go b/cmd/vendor/google.golang.org/grpc/call.go index 84ac178c4..5fba11eb0 100644 --- a/cmd/vendor/google.golang.org/grpc/call.go +++ b/cmd/vendor/google.golang.org/grpc/call.go @@ -36,6 +36,7 @@ package grpc import ( "bytes" "io" + "math" "time" "golang.org/x/net/context" @@ -51,13 +52,20 @@ import ( func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error { // Try to acquire header metadata from the server if there is any. var err error + defer func() { + if err != nil { + if _, ok := err.(transport.ConnectionError); !ok { + t.CloseStream(stream, err) + } + } + }() c.headerMD, err = stream.Header() if err != nil { return err } p := &parser{r: stream} for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply); err != nil { + if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil { if err == io.EOF { break } @@ -76,6 +84,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd } defer func() { if err != nil { + // If err is connection error, t will be closed, no need to close stream here. if _, ok := err.(transport.ConnectionError); !ok { t.CloseStream(stream, err) } @@ -90,7 +99,10 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - if err != nil { + // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method + // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following + // recvResponse to get the final status. + if err != nil && err != io.EOF { return nil, err } // Sent successfully. @@ -176,7 +188,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + // Retry a non-failfast RPC when + // i) there is a connection error; or + // ii) the server started to drain before this RPC was initiated. + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { return toRPCErr(err) } @@ -184,20 +199,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } return toRPCErr(err) } - // Receive the response err = recvResponse(cc.dopts, t, &c, stream, reply) if err != nil { if put != nil { put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { return toRPCErr(err) } continue } - t.CloseStream(stream, err) return toRPCErr(err) } if c.traceInfo.tr != nil { diff --git a/cmd/vendor/google.golang.org/grpc/clientconn.go b/cmd/vendor/google.golang.org/grpc/clientconn.go index c3c7691dc..d16ea2016 100644 --- a/cmd/vendor/google.golang.org/grpc/clientconn.go +++ b/cmd/vendor/google.golang.org/grpc/clientconn.go @@ -43,7 +43,6 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/transport" @@ -68,7 +67,7 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") - // errNetworkIP indicates that the connection is down due to some network I/O error. + // errNetworkIO indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") @@ -196,9 +195,14 @@ func WithTimeout(d time.Duration) DialOption { } // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. -func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption { +func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return func(o *dialOptions) { - o.copts.Dialer = f + o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { + if deadline, ok := ctx.Deadline(); ok { + return f(addr, deadline.Sub(time.Now())) + } + return f(addr, 0) + } } } @@ -211,10 +215,12 @@ func WithUserAgent(s string) DialOption { // Dial creates a client connection the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { + ctx := context.Background() cc := &ClientConn{ target: target, conns: make(map[Address]*addrConn), } + cc.ctx, cc.cancel = context.WithCancel(ctx) for _, opt := range opts { opt(&cc.dopts) } @@ -226,31 +232,33 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { if cc.dopts.bs == nil { cc.dopts.bs = DefaultBackoffConfig } - if cc.dopts.balancer == nil { - cc.dopts.balancer = RoundRobin(nil) - } - if err := cc.dopts.balancer.Start(target); err != nil { - return nil, err - } var ( ok bool addrs []Address ) - ch := cc.dopts.balancer.Notify() - if ch == nil { - // There is no name resolver installed. + if cc.dopts.balancer == nil { + // Connect to target directly if balancer is nil. addrs = append(addrs, Address{Addr: target}) } else { - addrs, ok = <-ch - if !ok || len(addrs) == 0 { - return nil, errNoAddr + if err := cc.dopts.balancer.Start(target); err != nil { + return nil, err + } + ch := cc.dopts.balancer.Notify() + if ch == nil { + // There is no name resolver installed. + addrs = append(addrs, Address{Addr: target}) + } else { + addrs, ok = <-ch + if !ok || len(addrs) == 0 { + return nil, errNoAddr + } } } waitC := make(chan error, 1) go func() { for _, a := range addrs { - if err := cc.newAddrConn(a, false); err != nil { + if err := cc.resetAddrConn(a, false, nil); err != nil { waitC <- err return } @@ -267,10 +275,15 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { cc.Close() return nil, err } + case <-cc.ctx.Done(): + cc.Close() + return nil, cc.ctx.Err() case <-timeoutCh: cc.Close() return nil, ErrClientConnTimeout } + // If balancer is nil or balancer.Notify() is nil, ok will be false here. + // The lbWatcher goroutine will not be created. if ok { go cc.lbWatcher() } @@ -317,6 +330,9 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC server. type ClientConn struct { + ctx context.Context + cancel context.CancelFunc + target string authority string dopts dialOptions @@ -347,11 +363,12 @@ func (cc *ClientConn) lbWatcher() { } if !keep { del = append(del, c) + delete(cc.conns, c.addr) } } cc.mu.Unlock() for _, a := range add { - cc.newAddrConn(a, true) + cc.resetAddrConn(a, true, nil) } for _, c := range del { c.tearDown(errConnDrain) @@ -359,13 +376,17 @@ func (cc *ClientConn) lbWatcher() { } } -func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { +// resetAddrConn creates an addrConn for addr and adds it to cc.conns. +// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. +// If tearDownErr is nil, errConnDrain will be used instead. +func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { ac := &addrConn{ - cc: cc, - addr: addr, - dopts: cc.dopts, - shutdownChan: make(chan struct{}), + cc: cc, + addr: addr, + dopts: cc.dopts, } + ac.ctx, ac.cancel = context.WithCancel(cc.ctx) + ac.stateCV = sync.NewCond(&ac.mu) if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) } @@ -383,26 +404,44 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } } } - // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called. - ac.cc.mu.Lock() - if ac.cc.conns == nil { - ac.cc.mu.Unlock() + // Track ac in cc. This needs to be done before any getTransport(...) is called. + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() return ErrClientConnClosing } - stale := ac.cc.conns[ac.addr] - ac.cc.conns[ac.addr] = ac - ac.cc.mu.Unlock() + stale := cc.conns[ac.addr] + cc.conns[ac.addr] = ac + cc.mu.Unlock() if stale != nil { // There is an addrConn alive on ac.addr already. This could be due to - // i) stale's Close is undergoing; - // ii) a buggy Balancer notifies duplicated Addresses. - stale.tearDown(errConnDrain) + // 1) a buggy Balancer notifies duplicated Addresses; + // 2) goaway was received, a new ac will replace the old ac. + // The old ac should be deleted from cc.conns, but the + // underlying transport should drain rather than close. + if tearDownErr == nil { + // tearDownErr is nil if resetAddrConn is called by + // 1) Dial + // 2) lbWatcher + // In both cases, the stale ac should drain, not close. + stale.tearDown(errConnDrain) + } else { + stale.tearDown(tearDownErr) + } } - ac.stateCV = sync.NewCond(&ac.mu) // skipWait may overwrite the decision in ac.dopts.block. if ac.dopts.block && !skipWait { if err := ac.resetTransport(false); err != nil { - ac.tearDown(err) + if err != errConnClosing { + // Tear down ac and delete it from cc.conns. + cc.mu.Lock() + delete(cc.conns, ac.addr) + cc.mu.Unlock() + ac.tearDown(err) + } + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { + return e.Origin() + } return err } // Start to monitor the error status of transport. @@ -412,7 +451,10 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { go func() { if err := ac.resetTransport(false); err != nil { grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) - ac.tearDown(err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } return } ac.transportMonitor() @@ -422,24 +464,48 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { - addr, put, err := cc.dopts.balancer.Get(ctx, opts) - if err != nil { - return nil, nil, toRPCErr(err) - } - cc.mu.RLock() - if cc.conns == nil { + var ( + ac *addrConn + ok bool + put func() + ) + if cc.dopts.balancer == nil { + // If balancer is nil, there should be only one addrConn available. + cc.mu.RLock() + for _, ac = range cc.conns { + // Break after the first iteration to get the first addrConn. + ok = true + break + } + cc.mu.RUnlock() + } else { + var ( + addr Address + err error + ) + addr, put, err = cc.dopts.balancer.Get(ctx, opts) + if err != nil { + return nil, nil, toRPCErr(err) + } + cc.mu.RLock() + if cc.conns == nil { + cc.mu.RUnlock() + return nil, nil, toRPCErr(ErrClientConnClosing) + } + ac, ok = cc.conns[addr] cc.mu.RUnlock() - return nil, nil, toRPCErr(ErrClientConnClosing) } - ac, ok := cc.conns[addr] - cc.mu.RUnlock() if !ok { if put != nil { put() } - return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc") + return nil, nil, errConnClosing } - t, err := ac.wait(ctx, !opts.BlockingWait) + // ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast. + // - If RPC is failfast, ac.wait should not block. + // - If balancer is not nil, ac.wait should return errConnClosing on transient failure + // so that non-failfast RPCs will try to get a new transport instead of waiting on ac. + t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait) if err != nil { if put != nil { put() @@ -451,6 +517,8 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { + cc.cancel() + cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() @@ -459,7 +527,9 @@ func (cc *ClientConn) Close() error { conns := cc.conns cc.conns = nil cc.mu.Unlock() - cc.dopts.balancer.Close() + if cc.dopts.balancer != nil { + cc.dopts.balancer.Close() + } for _, ac := range conns { ac.tearDown(ErrClientConnClosing) } @@ -468,11 +538,13 @@ func (cc *ClientConn) Close() error { // addrConn is a network connection to a given address. type addrConn struct { - cc *ClientConn - addr Address - dopts dialOptions - shutdownChan chan struct{} - events trace.EventLog + ctx context.Context + cancel context.CancelFunc + + cc *ClientConn + addr Address + dopts dialOptions + events trace.EventLog mu sync.Mutex state ConnectivityState @@ -482,6 +554,9 @@ type addrConn struct { // due to timeout. ready chan struct{} transport transport.ClientTransport + + // The reason this addrConn is torn down. + tearDownErr error } // printf records an event in ac's event log, unless ac has been closed. @@ -537,8 +612,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti } func (ac *addrConn) resetTransport(closeTransport bool) error { - var retries int - for { + for retries := 0; ; retries++ { ac.mu.Lock() ac.printf("connecting") if ac.state == Shutdown { @@ -558,13 +632,20 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { t.Close() } sleepTime := ac.dopts.bs.backoff(retries) - ac.dopts.copts.Timeout = sleepTime - if sleepTime < minConnectTimeout { - ac.dopts.copts.Timeout = minConnectTimeout + timeout := minConnectTimeout + if timeout < sleepTime { + timeout = sleepTime } + ctx, cancel := context.WithTimeout(ac.ctx, timeout) connectTime := time.Now() - newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts) + newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts) if err != nil { + cancel() + + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { + return err + } + grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr) ac.mu.Lock() if ac.state == Shutdown { // ac.tearDown(...) has been invoked. @@ -579,17 +660,12 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { ac.ready = nil } ac.mu.Unlock() - sleepTime -= time.Since(connectTime) - if sleepTime < 0 { - sleepTime = 0 - } closeTransport = false select { - case <-time.After(sleepTime): - case <-ac.shutdownChan: + case <-time.After(sleepTime - time.Since(connectTime)): + case <-ac.ctx.Done(): + return ac.ctx.Err() } - retries++ - grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr) continue } ac.mu.Lock() @@ -607,7 +683,9 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { close(ac.ready) ac.ready = nil } - ac.down = ac.cc.dopts.balancer.Up(ac.addr) + if ac.cc.dopts.balancer != nil { + ac.down = ac.cc.dopts.balancer.Up(ac.addr) + } ac.mu.Unlock() return nil } @@ -621,14 +699,42 @@ func (ac *addrConn) transportMonitor() { t := ac.transport ac.mu.Unlock() select { - // shutdownChan is needed to detect the teardown when + // This is needed to detect the teardown when // the addrConn is idle (i.e., no RPC in flight). - case <-ac.shutdownChan: + case <-ac.ctx.Done(): + select { + case <-t.Error(): + t.Close() + default: + } + return + case <-t.GoAway(): + // If GoAway happens without any network I/O error, ac is closed without shutting down the + // underlying transport (the transport will be closed when all the pending RPCs finished or + // failed.). + // If GoAway and some network I/O error happen concurrently, ac and its underlying transport + // are closed. + // In both cases, a new ac is created. + select { + case <-t.Error(): + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + default: + ac.cc.resetAddrConn(ac.addr, true, errConnDrain) + } return case <-t.Error(): + select { + case <-ac.ctx.Done(): + t.Close() + return + case <-t.GoAway(): + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + return + default: + } ac.mu.Lock() if ac.state == Shutdown { - // ac.tearDown(...) has been invoked. + // ac has been shutdown. ac.mu.Unlock() return } @@ -640,6 +746,10 @@ func (ac *addrConn) transportMonitor() { ac.printf("transport exiting: %v", err) ac.mu.Unlock() grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } return } } @@ -647,21 +757,22 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and the RPC is fail-fast. -func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) { +// iv) transport is in TransientFailure and blocking is false. +func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { case ac.state == Shutdown: + err := ac.tearDownErr ac.mu.Unlock() - return nil, errConnClosing + return nil, err case ac.state == Ready: ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure && failFast: + case ac.state == TransientFailure && !blocking: ac.mu.Unlock() - return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") + return nil, errConnClosing default: ready := ac.ready if ready == nil { @@ -683,24 +794,28 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many addrConn's in a // tight loop. +// tearDown doesn't remove ac from ac.cc.conns. func (ac *addrConn) tearDown(err error) { + ac.cancel() + ac.mu.Lock() - defer func() { - ac.mu.Unlock() - ac.cc.mu.Lock() - if ac.cc.conns != nil { - delete(ac.cc.conns, ac.addr) - } - ac.cc.mu.Unlock() - }() - if ac.state == Shutdown { - return - } - ac.state = Shutdown + defer ac.mu.Unlock() if ac.down != nil { ac.down(downErrorf(false, false, "%v", err)) ac.down = nil } + if err == errConnDrain && ac.transport != nil { + // GracefulClose(...) may be executed multiple times when + // i) receiving multiple GoAway frames from the server; or + // ii) there are concurrent name resolver/Balancer triggered + // address removal and GoAway. + ac.transport.GracefulClose() + } + if ac.state == Shutdown { + return + } + ac.state = Shutdown + ac.tearDownErr = err ac.stateCV.Broadcast() if ac.events != nil { ac.events.Finish() @@ -710,15 +825,8 @@ func (ac *addrConn) tearDown(err error) { close(ac.ready) ac.ready = nil } - if ac.transport != nil { - if err == errConnDrain { - ac.transport.GracefulClose() - } else { - ac.transport.Close() - } - } - if ac.shutdownChan != nil { - close(ac.shutdownChan) + if ac.transport != nil && err != errConnDrain { + ac.transport.Close() } return } diff --git a/cmd/vendor/google.golang.org/grpc/credentials/credentials.go b/cmd/vendor/google.golang.org/grpc/credentials/credentials.go index b021e1eb7..001f134d7 100644 --- a/cmd/vendor/google.golang.org/grpc/credentials/credentials.go +++ b/cmd/vendor/google.golang.org/grpc/credentials/credentials.go @@ -44,7 +44,6 @@ import ( "io/ioutil" "net" "strings" - "time" "golang.org/x/net/context" ) @@ -93,11 +92,12 @@ type TransportCredentials interface { // ClientHandshake does the authentication handshake specified by the corresponding // authentication protocol on rawConn for clients. It returns the authenticated // connection and the corresponding auth information about the connection. - ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (net.Conn, AuthInfo, error) + // Implementations must use the provided context to implement timely cancellation. + ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error) // ServerHandshake does the authentication handshake for servers. It returns // the authenticated connection and the corresponding auth information about // the connection. - ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) + ServerHandshake(net.Conn) (net.Conn, AuthInfo, error) // Info provides the ProtocolInfo of this TransportCredentials. Info() ProtocolInfo } @@ -136,42 +136,28 @@ func (c *tlsCreds) RequireTransportSecurity() bool { return true } -type timeoutError struct{} - -func (timeoutError) Error() string { return "credentials: Dial timed out" } -func (timeoutError) Timeout() bool { return true } -func (timeoutError) Temporary() bool { return true } - -func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (_ net.Conn, _ AuthInfo, err error) { - // borrow some code from tls.DialWithDialer - var errChannel chan error - if timeout != 0 { - errChannel = make(chan error, 2) - time.AfterFunc(timeout, func() { - errChannel <- timeoutError{} - }) - } +func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) { // use local cfg to avoid clobbering ServerName if using multiple endpoints - cfg := *c.config - if c.config.ServerName == "" { + cfg := cloneTLSConfig(c.config) + if cfg.ServerName == "" { colonPos := strings.LastIndex(addr, ":") if colonPos == -1 { colonPos = len(addr) } cfg.ServerName = addr[:colonPos] } - conn := tls.Client(rawConn, &cfg) - if timeout == 0 { - err = conn.Handshake() - } else { - go func() { - errChannel <- conn.Handshake() - }() - err = <-errChannel - } - if err != nil { - rawConn.Close() - return nil, nil, err + conn := tls.Client(rawConn, cfg) + errChannel := make(chan error, 1) + go func() { + errChannel <- conn.Handshake() + }() + select { + case err := <-errChannel: + if err != nil { + return nil, nil, err + } + case <-ctx.Done(): + return nil, nil, ctx.Err() } // TODO(zhaoq): Omit the auth info for client now. It is more for // information than anything else. @@ -181,7 +167,6 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) { conn := tls.Server(rawConn, c.config) if err := conn.Handshake(); err != nil { - rawConn.Close() return nil, nil, err } return conn, TLSInfo{conn.ConnectionState()}, nil @@ -189,7 +174,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) // NewTLS uses c to construct a TransportCredentials based on TLS. func NewTLS(c *tls.Config) TransportCredentials { - tc := &tlsCreds{c} + tc := &tlsCreds{cloneTLSConfig(c)} tc.config.NextProtos = alpnProtoStr return tc } diff --git a/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go b/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go new file mode 100644 index 000000000..9647b9ec8 --- /dev/null +++ b/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go @@ -0,0 +1,76 @@ +// +build go1.7 + +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package credentials + +import ( + "crypto/tls" +) + +// cloneTLSConfig returns a shallow clone of the exported +// fields of cfg, ignoring the unexported sync.Once, which +// contains a mutex and must not be copied. +// +// If cfg is nil, a new zero tls.Config is returned. +// +// TODO replace this function with official clone function. +func cloneTLSConfig(cfg *tls.Config) *tls.Config { + if cfg == nil { + return &tls.Config{} + } + return &tls.Config{ + Rand: cfg.Rand, + Time: cfg.Time, + Certificates: cfg.Certificates, + NameToCertificate: cfg.NameToCertificate, + GetCertificate: cfg.GetCertificate, + RootCAs: cfg.RootCAs, + NextProtos: cfg.NextProtos, + ServerName: cfg.ServerName, + ClientAuth: cfg.ClientAuth, + ClientCAs: cfg.ClientCAs, + InsecureSkipVerify: cfg.InsecureSkipVerify, + CipherSuites: cfg.CipherSuites, + PreferServerCipherSuites: cfg.PreferServerCipherSuites, + SessionTicketsDisabled: cfg.SessionTicketsDisabled, + SessionTicketKey: cfg.SessionTicketKey, + ClientSessionCache: cfg.ClientSessionCache, + MinVersion: cfg.MinVersion, + MaxVersion: cfg.MaxVersion, + CurvePreferences: cfg.CurvePreferences, + DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled, + Renegotiation: cfg.Renegotiation, + } +} diff --git a/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go b/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go new file mode 100644 index 000000000..09b8d12c7 --- /dev/null +++ b/cmd/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go @@ -0,0 +1,74 @@ +// +build !go1.7 + +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package credentials + +import ( + "crypto/tls" +) + +// cloneTLSConfig returns a shallow clone of the exported +// fields of cfg, ignoring the unexported sync.Once, which +// contains a mutex and must not be copied. +// +// If cfg is nil, a new zero tls.Config is returned. +// +// TODO replace this function with official clone function. +func cloneTLSConfig(cfg *tls.Config) *tls.Config { + if cfg == nil { + return &tls.Config{} + } + return &tls.Config{ + Rand: cfg.Rand, + Time: cfg.Time, + Certificates: cfg.Certificates, + NameToCertificate: cfg.NameToCertificate, + GetCertificate: cfg.GetCertificate, + RootCAs: cfg.RootCAs, + NextProtos: cfg.NextProtos, + ServerName: cfg.ServerName, + ClientAuth: cfg.ClientAuth, + ClientCAs: cfg.ClientCAs, + InsecureSkipVerify: cfg.InsecureSkipVerify, + CipherSuites: cfg.CipherSuites, + PreferServerCipherSuites: cfg.PreferServerCipherSuites, + SessionTicketsDisabled: cfg.SessionTicketsDisabled, + SessionTicketKey: cfg.SessionTicketKey, + ClientSessionCache: cfg.ClientSessionCache, + MinVersion: cfg.MinVersion, + MaxVersion: cfg.MaxVersion, + CurvePreferences: cfg.CurvePreferences, + } +} diff --git a/cmd/vendor/google.golang.org/grpc/metadata/metadata.go b/cmd/vendor/google.golang.org/grpc/metadata/metadata.go index 58469ddd3..b32001568 100644 --- a/cmd/vendor/google.golang.org/grpc/metadata/metadata.go +++ b/cmd/vendor/google.golang.org/grpc/metadata/metadata.go @@ -60,15 +60,21 @@ func encodeKeyValue(k, v string) (string, string) { // DecodeKeyValue returns the original key and value corresponding to the // encoded data in k, v. +// If k is a binary header and v contains comma, v is split on comma before decoded, +// and the decoded v will be joined with comma before returned. func DecodeKeyValue(k, v string) (string, string, error) { if !strings.HasSuffix(k, binHdrSuffix) { return k, v, nil } - val, err := base64.StdEncoding.DecodeString(v) - if err != nil { - return "", "", err + vvs := strings.Split(v, ",") + for i, vv := range vvs { + val, err := base64.StdEncoding.DecodeString(vv) + if err != nil { + return "", "", err + } + vvs[i] = string(val) } - return k, string(val), nil + return k, strings.Join(vvs, ","), nil } // MD is a mapping from metadata keys to values. Users should use the following diff --git a/cmd/vendor/google.golang.org/grpc/rpc_util.go b/cmd/vendor/google.golang.org/grpc/rpc_util.go index d62871756..35ac9cc7b 100644 --- a/cmd/vendor/google.golang.org/grpc/rpc_util.go +++ b/cmd/vendor/google.golang.org/grpc/rpc_util.go @@ -227,7 +227,7 @@ type parser struct { // No other error values or types must be returned, which also means // that the underlying io.Reader must not return an incompatible // error. -func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) { +func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) { if _, err := io.ReadFull(p.r, p.header[:]); err != nil { return 0, nil, err } @@ -238,6 +238,9 @@ func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) { if length == 0 { return pf, nil, nil } + if length > uint32(maxMsgSize) { + return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize) + } // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead // of making it for each message: msg = make([]byte, int(length)) @@ -308,8 +311,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er return nil } -func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}) error { - pf, d, err := p.recvMsg() +func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error { + pf, d, err := p.recvMsg(maxMsgSize) if err != nil { return err } @@ -319,11 +322,16 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ if pf == compressionMade { d, err = dc.Do(bytes.NewReader(d)) if err != nil { - return transport.StreamErrorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } + if len(d) > maxMsgSize { + // TODO: Revisit the error code. Currently keep it consistent with java + // implementation. + return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize) + } if err := c.Unmarshal(d, m); err != nil { - return transport.StreamErrorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) + return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } return nil } diff --git a/cmd/vendor/google.golang.org/grpc/server.go b/cmd/vendor/google.golang.org/grpc/server.go index a2b2b94d4..1ed8aac9e 100644 --- a/cmd/vendor/google.golang.org/grpc/server.go +++ b/cmd/vendor/google.golang.org/grpc/server.go @@ -89,9 +89,13 @@ type service struct { type Server struct { opts options - mu sync.Mutex // guards following - lis map[net.Listener]bool - conns map[io.Closer]bool + mu sync.Mutex // guards following + lis map[net.Listener]bool + conns map[io.Closer]bool + drain bool + // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished + // and all the transport goes away. + cv *sync.Cond m map[string]*service // service name -> service info events trace.EventLog } @@ -101,12 +105,15 @@ type options struct { codec Codec cp Compressor dc Decompressor + maxMsgSize int unaryInt UnaryServerInterceptor streamInt StreamServerInterceptor maxConcurrentStreams uint32 useHandlerImpl bool // use http.Handler-based server } +var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit + // A ServerOption sets options. type ServerOption func(*options) @@ -117,20 +124,28 @@ func CustomCodec(codec Codec) ServerOption { } } -// RPCCompressor returns a ServerOption that sets a compressor for outbound message. +// RPCCompressor returns a ServerOption that sets a compressor for outbound messages. func RPCCompressor(cp Compressor) ServerOption { return func(o *options) { o.cp = cp } } -// RPCDecompressor returns a ServerOption that sets a decompressor for inbound message. +// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. func RPCDecompressor(dc Decompressor) ServerOption { return func(o *options) { o.dc = dc } } +// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages. +// If this is not set, gRPC uses the default 4MB. +func MaxMsgSize(m int) ServerOption { + return func(o *options) { + o.maxMsgSize = m + } +} + // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { @@ -173,6 +188,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption { // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { var opts options + opts.maxMsgSize = defaultMaxMsgSize for _, o := range opt { o(&opts) } @@ -186,6 +202,7 @@ func NewServer(opt ...ServerOption) *Server { conns: make(map[io.Closer]bool), m: make(map[string]*service), } + s.cv = sync.NewCond(&s.mu) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) @@ -264,8 +281,8 @@ type ServiceInfo struct { // GetServiceInfo returns a map from service names to ServiceInfo. // Service names include the package names, in the form of .. -func (s *Server) GetServiceInfo() map[string]*ServiceInfo { - ret := make(map[string]*ServiceInfo) +func (s *Server) GetServiceInfo() map[string]ServiceInfo { + ret := make(map[string]ServiceInfo) for n, srv := range s.m { methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) for m := range srv.md { @@ -283,7 +300,7 @@ func (s *Server) GetServiceInfo() map[string]*ServiceInfo { }) } - ret[n] = &ServiceInfo{ + ret[n] = ServiceInfo{ Methods: methods, Metadata: srv.mdata, } @@ -468,7 +485,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea func (s *Server) addConn(c io.Closer) bool { s.mu.Lock() defer s.mu.Unlock() - if s.conns == nil { + if s.conns == nil || s.drain { return false } s.conns[c] = true @@ -480,6 +497,7 @@ func (s *Server) removeConn(c io.Closer) { defer s.mu.Unlock() if s.conns != nil { delete(s.conns, c) + s.cv.Signal() } } @@ -520,7 +538,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } p := &parser{r: stream} for { - pf, req, err := p.recvMsg() + pf, req, err := p.recvMsg(s.opts.maxMsgSize) if err == io.EOF { // The entire stream is done (for unary RPC only). return err @@ -530,6 +548,10 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } if err != nil { switch err := err.(type) { + case *rpcError: + if err := t.WriteStatus(stream, err.code, err.desc); err != nil { + grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) + } case transport.ConnectionError: // Nothing to do here. case transport.StreamError: @@ -569,6 +591,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return err } } + if len(req) > s.opts.maxMsgSize { + // TODO: Revisit the error code. Currently keep it consistent with + // java implementation. + statusCode = codes.Internal + statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize) + } if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } @@ -628,13 +656,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp stream.SetSendCompress(s.opts.cp.Type()) } ss := &serverStream{ - t: t, - s: stream, - p: &parser{r: stream}, - codec: s.opts.codec, - cp: s.opts.cp, - dc: s.opts.dc, - trInfo: trInfo, + t: t, + s: stream, + p: &parser{r: stream}, + codec: s.opts.codec, + cp: s.opts.cp, + dc: s.opts.dc, + maxMsgSize: s.opts.maxMsgSize, + trInfo: trInfo, } if ss.cp != nil { ss.cbuf = new(bytes.Buffer) @@ -766,14 +795,16 @@ func (s *Server) Stop() { s.mu.Lock() listeners := s.lis s.lis = nil - cs := s.conns + st := s.conns s.conns = nil + // interrupt GracefulStop if Stop and GracefulStop are called concurrently. + s.cv.Signal() s.mu.Unlock() for lis := range listeners { lis.Close() } - for c := range cs { + for c := range st { c.Close() } @@ -785,6 +816,32 @@ func (s *Server) Stop() { s.mu.Unlock() } +// GracefulStop stops the gRPC server gracefully. It stops the server to accept new +// connections and RPCs and blocks until all the pending RPCs are finished. +func (s *Server) GracefulStop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.drain == true || s.conns == nil { + return + } + s.drain = true + for lis := range s.lis { + lis.Close() + } + s.lis = nil + for c := range s.conns { + c.(transport.ServerTransport).Drain() + } + for len(s.conns) != 0 { + s.cv.Wait() + } + s.conns = nil + if s.events != nil { + s.events.Finish() + s.events = nil + } +} + func init() { internal.TestingCloseConns = func(arg interface{}) { arg.(*Server).testingCloseConns() diff --git a/cmd/vendor/google.golang.org/grpc/stream.go b/cmd/vendor/google.golang.org/grpc/stream.go index 7a3bef511..c1b07e89e 100644 --- a/cmd/vendor/google.golang.org/grpc/stream.go +++ b/cmd/vendor/google.golang.org/grpc/stream.go @@ -37,6 +37,7 @@ import ( "bytes" "errors" "io" + "math" "sync" "time" @@ -84,12 +85,9 @@ type ClientStream interface { // Header returns the header metadata received from the server if there // is any. It blocks if the metadata is not ready to read. Header() (metadata.MD, error) - // Trailer returns the trailer metadata from the server. It must be called - // after stream.Recv() returns non-nil error (including io.EOF) for - // bi-directional streaming and server streaming or stream.CloseAndRecv() - // returns for client streaming in order to receive trailer metadata if - // present. Otherwise, it could returns an empty MD even though trailer - // is present. + // Trailer returns the trailer metadata from the server, if there is any. + // It must only be called after stream.CloseAndRecv has returned, or + // stream.Recv has returned a non-nil error (including io.EOF). Trailer() metadata.MD // CloseSend closes the send direction of the stream. It closes the stream // when non-nil error is met. @@ -99,11 +97,10 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. -func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { +func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { var ( t transport.ClientTransport s *transport.Stream - err error put func() ) c := defaultCallInfo @@ -120,27 +117,24 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, - tracing: EnableTracing, - } - if cc.dopts.cp != nil { - callHdr.SendCompress = cc.dopts.cp.Type() - cs.cbuf = new(bytes.Buffer) - } - if cs.tracing { - cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) - cs.trInfo.firstLine.client = true + var trInfo traceInfo + if EnableTracing { + trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) + trInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { - cs.trInfo.firstLine.deadline = deadline.Sub(time.Now()) + trInfo.firstLine.deadline = deadline.Sub(time.Now()) } - cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) - ctx = trace.NewContext(ctx, cs.trInfo.tr) + trInfo.tr.LazyLog(&trInfo.firstLine, false) + ctx = trace.NewContext(ctx, trInfo.tr) + defer func() { + if err != nil { + // Need to call tr.finish() if error is returned. + // Because tr will not be returned to caller. + trInfo.tr.LazyPrintf("RPC: [%v]", err) + trInfo.tr.SetError() + trInfo.tr.Finish() + } + }() } gopts := BalancerGetOptions{ BlockingWait: !c.failFast, @@ -168,9 +162,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { - cs.finish(err) return nil, toRPCErr(err) } continue @@ -179,16 +172,43 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } break } - cs.put = put - cs.t = t - cs.s = s - cs.p = &parser{r: s} - // Listen on ctx.Done() to detect cancellation when there is no pending - // I/O operations on this stream. + cs := &clientStream{ + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + + put: put, + t: t, + s: s, + p: &parser{r: s}, + + tracing: EnableTracing, + trInfo: trInfo, + } + if cc.dopts.cp != nil { + cs.cbuf = new(bytes.Buffer) + } + // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination + // when there is no pending I/O operations on this stream. go func() { select { case <-t.Error(): // Incur transport error, simply exit. + case <-s.Done(): + // TODO: The trace of the RPC is terminated here when there is no pending + // I/O, which is probably not the optimal solution. + if s.StatusCode() == codes.OK { + cs.finish(nil) + } else { + cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc())) + } + cs.closeTransportStream(nil) + case <-s.GoAway(): + cs.finish(errConnDrain) + cs.closeTransportStream(errConnDrain) case <-s.Context().Done(): err := s.Context().Err() cs.finish(err) @@ -251,7 +271,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { cs.finish(err) } - if err == nil || err == io.EOF { + if err == nil { + return + } + if err == io.EOF { + // Specialize the process for server streaming. SendMesg is only called + // once when creating the stream object. io.EOF needs to be skipped when + // the rpc is early finished (before the stream object is created.). + // TODO: It is probably better to move this into the generated code. + if !cs.desc.ClientStreams && cs.desc.ServerStreams { + err = nil + } return } if _, ok := err.(transport.ConnectionError); !ok { @@ -272,7 +302,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - err = recv(cs.p, cs.codec, cs.s, cs.dc, m) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -291,7 +321,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { return } // Special handling for client streaming rpc. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) cs.closeTransportStream(err) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) @@ -326,7 +356,7 @@ func (cs *clientStream) CloseSend() (err error) { } }() if err == nil || err == io.EOF { - return + return nil } if _, ok := err.(transport.ConnectionError); !ok { cs.closeTransportStream(err) @@ -392,6 +422,7 @@ type serverStream struct { cp Compressor dc Decompressor cbuf *bytes.Buffer + maxMsgSize int statusCode codes.Code statusDesc string trInfo *traceInfo @@ -458,5 +489,5 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - return recv(ss.p, ss.codec, ss.s, ss.dc, m) + return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize) } diff --git a/cmd/vendor/google.golang.org/grpc/transport/control.go b/cmd/vendor/google.golang.org/grpc/transport/control.go index 7e9bdf335..4ef0830b5 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/control.go +++ b/cmd/vendor/google.golang.org/grpc/transport/control.go @@ -72,6 +72,11 @@ type resetStream struct { func (*resetStream) item() {} +type goAway struct { +} + +func (*goAway) item() {} + type flushIO struct { } diff --git a/cmd/vendor/google.golang.org/grpc/transport/go16.go b/cmd/vendor/google.golang.org/grpc/transport/go16.go new file mode 100644 index 000000000..ee1c46bad --- /dev/null +++ b/cmd/vendor/google.golang.org/grpc/transport/go16.go @@ -0,0 +1,46 @@ +// +build go1.6,!go1.7 + +/* + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package transport + +import ( + "net" + + "golang.org/x/net/context" +) + +// dialContext connects to the address on the named network. +func dialContext(ctx context.Context, network, address string) (net.Conn, error) { + return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address) +} diff --git a/cmd/vendor/google.golang.org/grpc/transport/go17.go b/cmd/vendor/google.golang.org/grpc/transport/go17.go new file mode 100644 index 000000000..356f13ff1 --- /dev/null +++ b/cmd/vendor/google.golang.org/grpc/transport/go17.go @@ -0,0 +1,46 @@ +// +build go1.7 + +/* + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package transport + +import ( + "net" + + "golang.org/x/net/context" +) + +// dialContext connects to the address on the named network. +func dialContext(ctx context.Context, network, address string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, address) +} diff --git a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go index 4b0d52524..30e21ac0f 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go @@ -83,7 +83,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr } if v := r.Header.Get("grpc-timeout"); v != "" { - to, err := timeoutDecode(v) + to, err := decodeTimeout(v) if err != nil { return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err) } @@ -194,7 +194,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, h := ht.rw.Header() h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode)) if statusDesc != "" { - h.Set("Grpc-Message", statusDesc) + h.Set("Grpc-Message", encodeGrpcMessage(statusDesc)) } if md := s.Trailer(); len(md) > 0 { for k, vv := range md { @@ -370,6 +370,10 @@ func (ht *serverHandlerTransport) runStream() { } } +func (ht *serverHandlerTransport) Drain() { + panic("Drain() is not implemented") +} + // mapRecvMsgError returns the non-nil err into the appropriate // error value as expected by callers of *grpc.parser.recvMsg. // In particular, in can only be: diff --git a/cmd/vendor/google.golang.org/grpc/transport/http2_client.go b/cmd/vendor/google.golang.org/grpc/transport/http2_client.go index f66435fdb..5819cb8a4 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/cmd/vendor/google.golang.org/grpc/transport/http2_client.go @@ -35,6 +35,7 @@ package transport import ( "bytes" + "fmt" "io" "math" "net" @@ -71,6 +72,9 @@ type http2Client struct { shutdownChan chan struct{} // errorChan is closed to notify the I/O error to the caller. errorChan chan struct{} + // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) + // that the server sent GoAway on this transport. + goAway chan struct{} framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -97,41 +101,44 @@ type http2Client struct { maxStreams int // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 + // goAwayID records the Last-Stream-ID in the GoAway frame from the server. + goAwayID uint32 + // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. + prevGoAwayID uint32 +} + +func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) { + if fn != nil { + return fn(ctx, addr) + } + return dialContext(ctx, "tcp", addr) } // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err error) { - if opts.Dialer == nil { - // Set the default Dialer. - opts.Dialer = func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp", addr, timeout) - } - } +func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) { scheme := "http" - startT := time.Now() - timeout := opts.Timeout - conn, connErr := opts.Dialer(addr, timeout) + conn, connErr := dial(opts.Dialer, ctx, addr) if connErr != nil { - return nil, ConnectionErrorf("transport: %v", connErr) + return nil, ConnectionErrorf(true, connErr, "transport: %v", connErr) } - var authInfo credentials.AuthInfo - if opts.TransportCredentials != nil { - scheme = "https" - if timeout > 0 { - timeout -= time.Since(startT) - } - conn, authInfo, connErr = opts.TransportCredentials.ClientHandshake(addr, conn, timeout) - } - if connErr != nil { - return nil, ConnectionErrorf("transport: %v", connErr) - } - defer func() { + // Any further errors will close the underlying connection + defer func(conn net.Conn) { if err != nil { conn.Close() } - }() + }(conn) + var authInfo credentials.AuthInfo + if creds := opts.TransportCredentials; creds != nil { + scheme = "https" + conn, authInfo, connErr = creds.ClientHandshake(ctx, addr, conn) + } + if connErr != nil { + // Credentials handshake error is not a temporary error (unless the error + // was the connection closing). + return nil, ConnectionErrorf(connErr == io.EOF, connErr, "transport: %v", connErr) + } ua := primaryUA if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua @@ -147,6 +154,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e writableChan: make(chan int, 1), shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), + goAway: make(chan struct{}), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -168,11 +176,11 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e n, err := t.conn.Write(clientPreface) if err != nil { t.Close() - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } if n != len(clientPreface) { t.Close() - return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) + return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } if initialWindowSize != defaultWindowSize { err = t.framer.writeSettings(true, http2.Setting{ @@ -184,13 +192,13 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e } if err != nil { t.Close() - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { t.Close() - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } } go t.controller() @@ -202,6 +210,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ id: t.nextID, + done: make(chan struct{}), + goAway: make(chan struct{}), method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), @@ -216,8 +226,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // Make a stream be able to cancel the pending operations by itself. s.ctx, s.cancel = context.WithCancel(ctx) s.dec = &recvBufferReader{ - ctx: s.ctx, - recv: s.buf, + ctx: s.ctx, + goAway: s.goAway, + recv: s.buf, } return s } @@ -271,6 +282,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } + if t.state == draining { + t.mu.Unlock() + return nil, ErrStreamDrain + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing @@ -278,7 +293,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() if checkStreamsQuota { - sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) + sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) if err != nil { return nil, err } @@ -287,7 +302,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.streamsQuota.add(sq - 1) } } - if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. if _, ok := err.(StreamError); ok && checkStreamsQuota { t.streamsQuota.add(1) @@ -295,6 +310,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, err } t.mu.Lock() + if t.state == draining { + t.mu.Unlock() + if checkStreamsQuota { + t.streamsQuota.add(1) + } + // Need to make t writable again so that the rpc in flight can still proceed. + t.writableChan <- 0 + return nil, ErrStreamDrain + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing @@ -329,7 +353,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) } if timeout > 0 { - t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)}) + t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) } for k, v := range authData { // Capital header names are illegal in HTTP/2. @@ -384,7 +408,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } if err != nil { t.notifyError(err) - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } } t.writableChan <- 0 @@ -403,22 +427,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) { if t.streamsQuota != nil { updateStreams = true } - if t.state == draining && len(t.activeStreams) == 1 { + delete(t.activeStreams, s.id) + if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. t.mu.Unlock() t.Close() return } - delete(t.activeStreams, s.id) t.mu.Unlock() if updateStreams { t.streamsQuota.add(1) } - // In case stream sending and receiving are invoked in separate - // goroutines (e.g., bi-directional streaming), the caller needs - // to call cancel on the stream to interrupt the blocking on - // other goroutines. - s.cancel() s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { @@ -445,13 +464,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // accessed any more. func (t *http2Client) Close() (err error) { t.mu.Lock() - if t.state == reachable { - close(t.errorChan) - } if t.state == closing { t.mu.Unlock() return } + if t.state == reachable || t.state == draining { + close(t.errorChan) + } t.state = closing t.mu.Unlock() close(t.shutdownChan) @@ -475,10 +494,35 @@ func (t *http2Client) Close() (err error) { func (t *http2Client) GracefulClose() error { t.mu.Lock() - if t.state == closing { + switch t.state { + case unreachable: + // The server may close the connection concurrently. t is not available for + // any streams. Close it now. + t.mu.Unlock() + t.Close() + return nil + case closing: t.mu.Unlock() return nil } + // Notify the streams which were initiated after the server sent GOAWAY. + select { + case <-t.goAway: + n := t.prevGoAwayID + if n == 0 && t.nextID > 1 { + n = t.nextID - 2 + } + m := t.goAwayID + 2 + if m == 2 { + m = 1 + } + for i := m; i <= n; i += 2 { + if s, ok := t.activeStreams[i]; ok { + close(s.goAway) + } + } + default: + } if t.state == draining { t.mu.Unlock() return nil @@ -504,15 +548,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { size := http2MaxFrameLen s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire()) + sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { - if _, ok := err.(StreamError); ok { + if _, ok := err.(StreamError); ok || err == io.EOF { t.sendQuotaPool.cancel() } return err @@ -544,8 +588,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // Indicate there is a writer who is about to write a data frame. t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { - if _, ok := err.(StreamError); ok { + if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil { + if _, ok := err.(StreamError); ok || err == io.EOF { // Return the connection quota back. t.sendQuotaPool.add(len(p)) } @@ -578,7 +622,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // invoked. if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil { t.notifyError(err) - return ConnectionErrorf("transport: %v", err) + return ConnectionErrorf(true, err, "transport: %v", err) } if t.framer.adjustNumWriters(-1) == 0 { t.framer.flushWrite() @@ -593,11 +637,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } s.mu.Lock() if s.state != streamDone { - if s.state == streamReadDone { - s.state = streamDone - } else { - s.state = streamWriteDone - } + s.state = streamWriteDone } s.mu.Unlock() return nil @@ -630,7 +670,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { func (t *http2Client) handleData(f *http2.DataFrame) { size := len(f.Data()) if err := t.fc.onData(uint32(size)); err != nil { - t.notifyError(ConnectionErrorf("%v", err)) + t.notifyError(ConnectionErrorf(true, err, "%v", err)) return } // Select the right stream to dispatch. @@ -655,6 +695,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) @@ -672,13 +713,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // the read direction is closed, and set the status appropriately. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { s.mu.Lock() - if s.state == streamWriteDone { - s.state = streamDone - } else { - s.state = streamReadDone + if s.state == streamDone { + s.mu.Unlock() + return } + s.state = streamDone s.statusCode = codes.Internal s.statusDesc = "server closed the stream without sending trailers" + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -704,6 +746,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) s.statusCode = codes.Unknown } + s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode) + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -728,7 +772,32 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { - // TODO(zhaoq): GoAwayFrame handler to be implemented + t.mu.Lock() + if t.state == reachable || t.state == draining { + if f.LastStreamID > 0 && f.LastStreamID%2 != 1 { + t.mu.Unlock() + t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) + return + } + select { + case <-t.goAway: + id := t.goAwayID + // t.goAway has been closed (i.e.,multiple GoAways). + if id < f.LastStreamID { + t.mu.Unlock() + t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) + return + } + t.prevGoAwayID = id + t.goAwayID = f.LastStreamID + t.mu.Unlock() + return + default: + } + t.goAwayID = f.LastStreamID + close(t.goAway) + } + t.mu.Unlock() } func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { @@ -780,11 +849,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if len(state.mdata) > 0 { s.trailer = state.mdata } - s.state = streamDone s.statusCode = state.statusCode s.statusDesc = state.statusDesc + close(s.done) + s.state = streamDone s.mu.Unlock() - s.write(recvMsg{err: io.EOF}) } @@ -937,13 +1006,22 @@ func (t *http2Client) Error() <-chan struct{} { return t.errorChan } +func (t *http2Client) GoAway() <-chan struct{} { + return t.goAway +} + func (t *http2Client) notifyError(err error) { t.mu.Lock() - defer t.mu.Unlock() // make sure t.errorChan is closed only once. + if t.state == draining { + t.mu.Unlock() + t.Close() + return + } if t.state == reachable { t.state = unreachable close(t.errorChan) grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) } + t.mu.Unlock() } diff --git a/cmd/vendor/google.golang.org/grpc/transport/http2_server.go b/cmd/vendor/google.golang.org/grpc/transport/http2_server.go index cee154297..16010d55f 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/cmd/vendor/google.golang.org/grpc/transport/http2_server.go @@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI Val: uint32(initialWindowSize)}) } if err := framer.writeSettings(true, settings...); err != nil { - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { if err := framer.writeWindowUpdate(true, 0, delta); err != nil { - return nil, ConnectionErrorf("transport: %v", err) + return nil, ConnectionErrorf(true, err, "transport: %v", err) } } var buf bytes.Buffer @@ -142,7 +142,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI } // operateHeader takes action on the decoded headers. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) { +func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) { buf := newRecvBuffer() s := &Stream{ id: frame.Header().StreamID, @@ -205,6 +205,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) return } + if s.id%2 != 1 || s.id <= t.maxStreamID { + t.mu.Unlock() + // illegal gRPC stream id. + grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id) + return true + } + t.maxStreamID = s.id s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) t.activeStreams[s.id] = s t.mu.Unlock() @@ -212,6 +219,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.updateWindow(s, uint32(n)) } handle(s) + return } // HandleStreams receives incoming streams using the given handler. This is @@ -231,6 +239,10 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { } frame, err := t.framer.readFrame() + if err == io.EOF || err == io.ErrUnexpectedEOF { + t.Close() + return + } if err != nil { grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err) t.Close() @@ -257,20 +269,20 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { t.controlBuf.put(&resetStream{se.StreamID, se.Code}) continue } + if err == io.EOF || err == io.ErrUnexpectedEOF { + t.Close() + return + } + grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err) t.Close() return } switch frame := frame.(type) { case *http2.MetaHeadersFrame: - id := frame.Header().StreamID - if id%2 != 1 || id <= t.maxStreamID { - // illegal gRPC stream id. - grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id) + if t.operateHeaders(frame, handle) { t.Close() break } - t.maxStreamID = id - t.operateHeaders(frame, handle) case *http2.DataFrame: t.handleData(frame) case *http2.RSTStreamFrame: @@ -282,7 +294,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) case *http2.GoAwayFrame: - break + // TODO: Handle GoAway from the client appropriately. default: grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) } @@ -364,11 +376,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // Received the end of stream from the client. s.mu.Lock() if s.state != streamDone { - if s.state == streamWriteDone { - s.state = streamDone - } else { - s.state = streamReadDone - } + s.state = streamReadDone } s.mu.Unlock() s.write(recvMsg{err: io.EOF}) @@ -440,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e } if err != nil { t.Close() - return ConnectionErrorf("transport: %v", err) + return ConnectionErrorf(true, err, "transport: %v", err) } } return nil @@ -455,7 +463,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { } s.headerOk = true s.mu.Unlock() - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -495,7 +503,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s headersSent = true } s.mu.Unlock() - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -508,7 +516,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s Name: "grpc-status", Value: strconv.Itoa(int(statusCode)), }) - t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc}) + t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)}) // Attach the trailer metadata. for k, v := range s.trailer { // Clients don't tolerate reading restricted headers after some non restricted ones were sent. @@ -544,7 +552,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } s.mu.Unlock() if writeHeaderFrame { - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -560,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.writeHeaders(false, p); err != nil { t.Close() - return ConnectionErrorf("transport: %v", err) + return ConnectionErrorf(true, err, "transport: %v", err) } t.writableChan <- 0 } @@ -572,13 +580,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { size := http2MaxFrameLen s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire()) + sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { if _, ok := err.(StreamError); ok { t.sendQuotaPool.cancel() @@ -604,7 +612,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the // transport. - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { if _, ok := err.(StreamError); ok { // Return the connection quota back. t.sendQuotaPool.add(ps) @@ -634,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { t.Close() - return ConnectionErrorf("transport: %v", err) + return ConnectionErrorf(true, err, "transport: %v", err) } if t.framer.adjustNumWriters(-1) == 0 { t.framer.flushWrite() @@ -679,6 +687,17 @@ func (t *http2Server) controller() { } case *resetStream: t.framer.writeRSTStream(true, i.streamID, i.code) + case *goAway: + t.mu.Lock() + if t.state == closing { + t.mu.Unlock() + // The transport is closing. + return + } + sid := t.maxStreamID + t.state = draining + t.mu.Unlock() + t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil) case *flushIO: t.framer.flushWrite() case *ping: @@ -724,6 +743,9 @@ func (t *http2Server) Close() (err error) { func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) + if t.state == draining && len(t.activeStreams) == 0 { + defer t.Close() + } t.mu.Unlock() // In case stream sending and receiving are invoked in separate // goroutines (e.g., bi-directional streaming), cancel needs to be @@ -746,3 +768,7 @@ func (t *http2Server) closeStream(s *Stream) { func (t *http2Server) RemoteAddr() net.Addr { return t.conn.RemoteAddr() } + +func (t *http2Server) Drain() { + t.controlBuf.put(&goAway{}) +} diff --git a/cmd/vendor/google.golang.org/grpc/transport/http_util.go b/cmd/vendor/google.golang.org/grpc/transport/http_util.go index f2e23dcec..3e16e4df4 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/http_util.go +++ b/cmd/vendor/google.golang.org/grpc/transport/http_util.go @@ -35,6 +35,7 @@ package transport import ( "bufio" + "bytes" "fmt" "io" "net" @@ -174,11 +175,11 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) { } d.statusCode = codes.Code(code) case "grpc-message": - d.statusDesc = f.Value + d.statusDesc = decodeGrpcMessage(f.Value) case "grpc-timeout": d.timeoutSet = true var err error - d.timeout, err = timeoutDecode(f.Value) + d.timeout, err = decodeTimeout(f.Value) if err != nil { d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err)) return @@ -251,7 +252,7 @@ func div(d, r time.Duration) int64 { } // TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it. -func timeoutEncode(t time.Duration) string { +func encodeTimeout(t time.Duration) string { if d := div(t, time.Nanosecond); d <= maxTimeoutValue { return strconv.FormatInt(d, 10) + "n" } @@ -271,7 +272,7 @@ func timeoutEncode(t time.Duration) string { return strconv.FormatInt(div(t, time.Hour), 10) + "H" } -func timeoutDecode(s string) (time.Duration, error) { +func decodeTimeout(s string) (time.Duration, error) { size := len(s) if size < 2 { return 0, fmt.Errorf("transport: timeout string is too short: %q", s) @@ -288,6 +289,80 @@ func timeoutDecode(s string) (time.Duration, error) { return d * time.Duration(t), nil } +const ( + spaceByte = ' ' + tildaByte = '~' + percentByte = '%' +) + +// encodeGrpcMessage is used to encode status code in header field +// "grpc-message". +// It checks to see if each individual byte in msg is an +// allowable byte, and then either percent encoding or passing it through. +// When percent encoding, the byte is converted into hexadecimal notation +// with a '%' prepended. +func encodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if !(c >= spaceByte && c < tildaByte && c != percentByte) { + return encodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func encodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if c >= spaceByte && c < tildaByte && c != percentByte { + buf.WriteByte(c) + } else { + buf.WriteString(fmt.Sprintf("%%%02X", c)) + } + } + return buf.String() +} + +// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage. +func decodeGrpcMessage(msg string) string { + if msg == "" { + return "" + } + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + if msg[i] == percentByte && i+2 < lenMsg { + return decodeGrpcMessageUnchecked(msg) + } + } + return msg +} + +func decodeGrpcMessageUnchecked(msg string) string { + var buf bytes.Buffer + lenMsg := len(msg) + for i := 0; i < lenMsg; i++ { + c := msg[i] + if c == percentByte && i+2 < lenMsg { + parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8) + if err != nil { + buf.WriteByte(c) + } else { + buf.WriteByte(byte(parsed)) + i += 2 + } + } else { + buf.WriteByte(c) + } + } + return buf.String() +} + type framer struct { numWriters int32 reader io.Reader diff --git a/cmd/vendor/google.golang.org/grpc/transport/pre_go16.go b/cmd/vendor/google.golang.org/grpc/transport/pre_go16.go new file mode 100644 index 000000000..33d91c17c --- /dev/null +++ b/cmd/vendor/google.golang.org/grpc/transport/pre_go16.go @@ -0,0 +1,51 @@ +// +build !go1.6 + +/* + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package transport + +import ( + "net" + "time" + + "golang.org/x/net/context" +) + +// dialContext connects to the address on the named network. +func dialContext(ctx context.Context, network, address string) (net.Conn, error) { + var dialer net.Dialer + if deadline, ok := ctx.Deadline(); ok { + dialer.Timeout = deadline.Sub(time.Now()) + } + return dialer.Dial(network, address) +} diff --git a/cmd/vendor/google.golang.org/grpc/transport/transport.go b/cmd/vendor/google.golang.org/grpc/transport/transport.go index 599f25d17..f0885da23 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/transport.go +++ b/cmd/vendor/google.golang.org/grpc/transport/transport.go @@ -44,7 +44,6 @@ import ( "io" "net" "sync" - "time" "golang.org/x/net/context" "golang.org/x/net/trace" @@ -120,10 +119,11 @@ func (b *recvBuffer) get() <-chan item { // recvBufferReader implements io.Reader interface to read the data from // recvBuffer. type recvBufferReader struct { - ctx context.Context - recv *recvBuffer - last *bytes.Reader // Stores the remaining data in the previous calls. - err error + ctx context.Context + goAway chan struct{} + recv *recvBuffer + last *bytes.Reader // Stores the remaining data in the previous calls. + err error } // Read reads the next len(p) bytes from last. If last is drained, it tries to @@ -141,6 +141,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) { select { case <-r.ctx.Done(): return 0, ContextErr(r.ctx.Err()) + case <-r.goAway: + return 0, ErrStreamDrain case i := <-r.recv.get(): r.recv.load() m := i.(*recvMsg) @@ -158,7 +160,7 @@ const ( streamActive streamState = iota streamWriteDone // EndStream sent streamReadDone // EndStream received - streamDone // sendDone and recvDone or RSTStreamFrame is sent or received. + streamDone // the entire stream is finished. ) // Stream represents an RPC in the transport layer. @@ -169,6 +171,10 @@ type Stream struct { // ctx is the associated context of the stream. ctx context.Context cancel context.CancelFunc + // done is closed when the final status arrives. + done chan struct{} + // goAway is closed when the server sent GoAways signal before this stream was initiated. + goAway chan struct{} // method records the associated RPC method of the stream. method string recvCompress string @@ -214,6 +220,18 @@ func (s *Stream) SetSendCompress(str string) { s.sendCompress = str } +// Done returns a chanel which is closed when it receives the final status +// from the server. +func (s *Stream) Done() <-chan struct{} { + return s.done +} + +// GoAway returns a channel which is closed when the server sent GoAways signal +// before this stream was initiated. +func (s *Stream) GoAway() <-chan struct{} { + return s.goAway +} + // Header acquires the key-value pairs of header metadata once it // is available. It blocks until i) the metadata is ready or ii) there is no // header metadata or iii) the stream is cancelled/expired. @@ -221,6 +239,8 @@ func (s *Stream) Header() (metadata.MD, error) { select { case <-s.ctx.Done(): return nil, ContextErr(s.ctx.Err()) + case <-s.goAway: + return nil, ErrStreamDrain case <-s.headerChan: return s.header.Copy(), nil } @@ -335,19 +355,17 @@ type ConnectOptions struct { // UserAgent is the application user agent. UserAgent string // Dialer specifies how to dial a network address. - Dialer func(string, time.Duration) (net.Conn, error) + Dialer func(context.Context, string) (net.Conn, error) // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. PerRPCCredentials []credentials.PerRPCCredentials // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials - // Timeout specifies the timeout for dialing a ClientTransport. - Timeout time.Duration } // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(target string, opts *ConnectOptions) (ClientTransport, error) { - return newHTTP2Client(target, opts) +func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) { + return newHTTP2Client(ctx, target, opts) } // Options provides additional hints and information for message @@ -417,6 +435,11 @@ type ClientTransport interface { // and create a new one) in error case. It should not return nil // once the transport is initiated. Error() <-chan struct{} + + // GoAway returns a channel that is closed when ClientTranspor + // receives the draining signal from the server (e.g., GOAWAY frame in + // HTTP/2). + GoAway() <-chan struct{} } // ServerTransport is the common interface for all gRPC server-side transport @@ -448,6 +471,9 @@ type ServerTransport interface { // RemoteAddr returns the remote network address. RemoteAddr() net.Addr + + // Drain notifies the client this ServerTransport stops accepting new RPCs. + Drain() } // StreamErrorf creates an StreamError with the specified error code and description. @@ -459,9 +485,11 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError { } // ConnectionErrorf creates an ConnectionError with the specified error description. -func ConnectionErrorf(format string, a ...interface{}) ConnectionError { +func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { return ConnectionError{ Desc: fmt.Sprintf(format, a...), + temp: temp, + err: e, } } @@ -469,14 +497,36 @@ func ConnectionErrorf(format string, a ...interface{}) ConnectionError { // entire connection and the retry of all the active streams. type ConnectionError struct { Desc string + temp bool + err error } func (e ConnectionError) Error() string { return fmt.Sprintf("connection error: desc = %q", e.Desc) } -// ErrConnClosing indicates that the transport is closing. -var ErrConnClosing = ConnectionError{Desc: "transport is closing"} +// Temporary indicates if this connection error is temporary or fatal. +func (e ConnectionError) Temporary() bool { + return e.temp +} + +// Origin returns the original error of this connection error. +func (e ConnectionError) Origin() error { + // Never return nil error here. + // If the original error is nil, return itself. + if e.err == nil { + return e + } + return e.err +} + +var ( + // ErrConnClosing indicates that the transport is closing. + ErrConnClosing = ConnectionError{Desc: "transport is closing", temp: true} + // ErrStreamDrain indicates that the stream is rejected by the server because + // the server stops accepting new RPCs. + ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs") +) // StreamError is an error that only affects one stream within a connection. type StreamError struct { @@ -501,12 +551,25 @@ func ContextErr(err error) StreamError { // wait blocks until it can receive from ctx.Done, closing, or proceed. // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. +// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise +// it return the StreamError for ctx.Err. +// If it receives from goAway, it returns 0, ErrStreamDrain. // If it receives from closing, it returns 0, ErrConnClosing. // If it receives from proceed, it returns the received integer, nil. -func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) { +func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) { select { case <-ctx.Done(): return 0, ContextErr(ctx.Err()) + case <-done: + // User cancellation has precedence. + select { + case <-ctx.Done(): + return 0, ContextErr(ctx.Err()) + default: + } + return 0, io.EOF + case <-goAway: + return 0, ErrStreamDrain case <-closing: return 0, ErrConnClosing case i := <-proceed: