Merge pull request #7692 from heyitsanthony/upgrade-grpc

vendor: upgrade grpc to 1.2.1
This commit is contained in:
Anthony Romano 2017-04-07 16:04:50 -07:00 committed by GitHub
commit c0560be98a
22 changed files with 1436 additions and 241 deletions

View File

@ -36,12 +36,13 @@ package grpc
import (
"bytes"
"io"
"math"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -49,7 +50,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// TODO ctx is used for stats collection and processing. It is the context passed from the application.
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
defer func() {
if err != nil {
@ -63,20 +65,34 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
return
}
p := &parser{r: stream}
var inPayload *stats.InPayload
if dopts.copts.StatsHandler != nil {
inPayload = &stats.InPayload{
Client: true,
}
}
for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil
}
// sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, err
@ -89,15 +105,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
}
}()
var cbuf *bytes.Buffer
var (
cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
if compressor != nil {
cbuf = new(bytes.Buffer)
}
outBuf, err := encode(codec, args, compressor, cbuf)
if dopts.copts.StatsHandler != nil {
outPayload = &stats.OutPayload{
Client: true,
}
}
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
dopts.copts.StatsHandler.HandleRPC(ctx, outPayload)
}
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
// recvResponse to get the final status.
@ -118,8 +146,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
}
}
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
@ -140,12 +176,32 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
if err != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
if e != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
c.traceInfo.tr.SetError()
}
}()
}
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
sh.HandleRPC(ctx, begin)
}
defer func() {
if sh != nil {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
sh.HandleRPC(ctx, end)
}
}()
topts := &transport.Options{
Last: true,
Delay: false,
@ -167,6 +223,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@ -188,7 +245,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put()
@ -205,7 +262,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
err = recvResponse(cc.dopts, t, &c, stream, reply)
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()

View File

@ -36,6 +36,7 @@ package grpc
import (
"errors"
"fmt"
"math"
"net"
"strings"
"sync"
@ -45,6 +46,8 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -54,6 +57,8 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
// removed in Q1 2017.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@ -83,22 +88,33 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
maxMsgSize int
}
const defaultClientMaxMsgSize = math.MaxInt32
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
func WithMaxMsgSize(s int) DialOption {
return func(o *dialOptions) {
o.maxMsgSize = s
}
}
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
@ -129,6 +145,13 @@ func WithBalancer(b Balancer) DialOption {
}
}
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) {
o.scChan = c
}
}
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
func WithBackoffMaxDelay(md time.Duration) DialOption {
@ -199,6 +222,8 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
@ -210,6 +235,25 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
}
}
// WithStatsHandler returns a DialOption that specifies the stats handler
// for all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption {
return func(o *dialOptions) {
o.copts.StatsHandler = h
}
}
// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
// address and won't try to reconnect.
// The default value of FailOnNonTempDialError is false.
// This is an EXPERIMENTAL API.
func FailOnNonTempDialError(f bool) DialOption {
return func(o *dialOptions) {
o.copts.FailOnNonTempDialError = f
}
}
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
func WithUserAgent(s string) DialOption {
return func(o *dialOptions) {
@ -217,6 +261,13 @@ func WithUserAgent(s string) DialOption {
}
}
// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
}
}
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
@ -231,6 +282,15 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
// WithAuthority returns a DialOption that specifies the value to be used as
// the :authority pseudo-header. This value only works with WithInsecure and
// has no effect if TransportCredentials are present.
func WithAuthority(a string) DialOption {
return func(o *dialOptions) {
o.copts.Authority = a
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@ -247,6 +307,24 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.dopts.maxMsgSize = defaultClientMaxMsgSize
for _, opt := range opts {
opt(&cc.dopts)
}
grpcUA := "grpc-go/" + Version
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
@ -259,10 +337,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
for _, opt := range opts {
opt(&cc.dopts)
if cc.dopts.scChan != nil {
// Wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}
@ -273,6 +358,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
@ -284,6 +371,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
waitC := make(chan error, 1)
go func() {
var addrs []Address
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
@ -319,10 +409,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
close(waitC)
}()
var timeoutCh <-chan time.Time
if cc.dopts.timeout > 0 {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
@ -330,14 +416,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
case <-timeoutCh:
return nil, ErrClientConnTimeout
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
return cc, nil
}
@ -384,6 +473,7 @@ type ClientConn struct {
dopts dialOptions
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
}
@ -422,6 +512,24 @@ func (cc *ClientConn) lbWatcher() {
}
}
func (cc *ClientConn) scWatcher() {
for {
select {
case sc, ok := <-cc.dopts.scChan:
if !ok {
return
}
cc.mu.Lock()
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc.sc = sc
cc.mu.Unlock()
case <-cc.ctx.Done():
return
}
}
}
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
@ -509,6 +617,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
// TODO: Avoid the locking here.
func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
cc.mu.RLock()
defer cc.mu.RUnlock()
m, ok = cc.sc.Methods[method]
return
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn
@ -689,6 +805,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
// Don't call cancel in success path due to a race in Go 1.6:
// https://github.com/golang/go/issues/15078.
if err != nil {
cancel()

View File

@ -165,9 +165,7 @@ func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net
case <-ctx.Done():
return nil, nil, ctx.Err()
}
// TODO(zhaoq): Omit the auth info for client now. It is more for
// information than anything else.
return conn, nil, nil
return conn, TLSInfo{conn.ConnectionState()}, nil
}
func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {

View File

@ -1,4 +1,5 @@
// +build go1.7
// +build !go1.8
/*
*
@ -44,8 +45,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}

View File

@ -0,0 +1,53 @@
// +build go1.8
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package credentials
import (
"crypto/tls"
)
// cloneTLSConfig returns a shallow clone of the exported
// fields of cfg, ignoring the unexported sync.Once, which
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return cfg.Clone()
}

View File

@ -44,8 +44,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}

View File

@ -0,0 +1,52 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Package keepalive defines configurable parameters for point-to-point healthcheck.
package keepalive
import (
"time"
)
// ClientParameters is used to set keepalive parameters on the client-side.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}

View File

@ -32,6 +32,7 @@
*/
// Package metadata define the structure of the metadata supported by gRPC library.
// Please refer to http://www.grpc.io/docs/guides/wire.html for more information about custom-metadata.
package metadata // import "google.golang.org/grpc/metadata"
import (
@ -82,6 +83,7 @@ func DecodeKeyValue(k, v string) (string, string, error) {
type MD map[string][]string
// New creates a MD from given key-value map.
// Keys are automatically converted to lowercase. And for keys having "-bin" as suffix, their values will be applied Base64 encoding.
func New(m map[string]string) MD {
md := MD{}
for k, v := range m {
@ -93,6 +95,7 @@ func New(m map[string]string) MD {
// Pairs returns an MD formed by the mapping of key, value ...
// Pairs panics if len(kv) is odd.
// Keys are automatically converted to lowercase. And for keys having "-bin" as suffix, their values will be appplied Base64 encoding.
func Pairs(kv ...string) MD {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
@ -141,6 +144,8 @@ func NewContext(ctx context.Context, md MD) context.Context {
}
// FromContext returns the MD in ctx if it exists.
// The returned md should be immutable, writing to it may cause races.
// Modification should be made to the copies of the returned md.
func FromContext(ctx context.Context) (md MD, ok bool) {
md, ok = ctx.Value(mdKey{}).(MD)
return

View File

@ -42,11 +42,14 @@ import (
"io/ioutil"
"math"
"os"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -138,6 +141,7 @@ type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
}
@ -181,12 +185,20 @@ func Trailer(md *metadata.MD) CallOption {
})
}
// Peer returns a CallOption that retrieves peer information for a
// unary RPC.
func Peer(peer *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
*peer = *c.peer
})
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md. Note: failFast is default to true.
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
@ -255,9 +267,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
var b []byte
var length uint
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
var (
b []byte
length uint
)
if msg != nil {
var err error
// TODO(zhaoq): optimize to reduce memory alloc and copying.
@ -265,6 +279,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
if err != nil {
return nil, err
}
if outPayload != nil {
outPayload.Payload = msg
// TODO truncate large payload.
outPayload.Data = b
outPayload.Length = len(b)
}
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
return nil, err
@ -295,6 +315,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
// Copy encoded msg to buf
copy(buf[5:], b)
if outPayload != nil {
outPayload.WireLength = len(buf)
}
return buf, nil
}
@ -311,11 +335,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
if inPayload != nil {
inPayload.WireLength = len(d)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
@ -333,6 +360,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
if inPayload != nil {
inPayload.RecvTime = time.Now()
inPayload.Payload = m
// TODO truncate large payload.
inPayload.Data = d
inPayload.Length = len(d)
}
return nil
}
@ -343,7 +377,7 @@ type rpcError struct {
}
func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
return fmt.Sprintf("rpc error: code = %s desc = %s", e.code, e.desc)
}
// Code returns the error code for err if it was produced by the rpc system.
@ -448,6 +482,44 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
// This is EXPERIMENTAL and subject to change.
type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minumum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
// TODO: support this.
MaxReqSize uint32
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
MaxRespSize uint32
}
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
// This is EXPERIMENTAL and subject to change.
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
Methods map[string]MethodConfig
}
// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
@ -455,3 +527,6 @@ func convertCode(err error) codes.Code {
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion4 = true
// Version is the current grpc version.
const Version = "1.2.1"

View File

@ -54,6 +54,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
@ -110,8 +112,11 @@ type options struct {
maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@ -186,6 +191,42 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
}
}
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return func(o *options) {
if o.inTapHandle != nil {
panic("The tap handle has been set.")
}
o.inTapHandle = h
}
}
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
return func(o *options) {
o.statsHandler = h
}
}
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function has full access to the Context of the request and the
// stream, and the invocation passes through interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return func(o *options) {
o.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: streamHandler,
// We need to assume that the users of the streamHandler will want to use both.
ClientStreams: true,
ServerStreams: true,
}
}
}
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@ -329,6 +370,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve always returns non-nil error.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@ -412,17 +454,23 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
if s.opts.useHandlerImpl {
s.serveUsingHandler(conn)
} else {
s.serveNewHTTP2Transport(conn, authInfo)
s.serveHTTP2Transport(conn, authInfo)
}
}
// serveNewHTTP2Transport sets up a new http/2 transport (using the
// serveHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and
// serves streams on it.
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
@ -448,6 +496,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
@ -497,15 +551,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
if !EnableTracing {
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
}
trInfo = &traceInfo{
tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
tr: tr,
}
trInfo.firstLine.client = false
trInfo.firstLine.remoteAddr = st.RemoteAddr()
stream.TraceContext(trInfo.tr)
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = dl.Sub(time.Now())
}
@ -532,11 +588,17 @@ func (s *Server) removeConn(c io.Closer) {
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
var cbuf *bytes.Buffer
var (
cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
if cp != nil {
cbuf = new(bytes.Buffer)
}
p, err := encode(s.opts.codec, msg, cp, cbuf)
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
// This typically indicates a fatal issue (e.g., memory
// corruption or hardware faults) the application program
@ -547,10 +609,33 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
// the optimal option.
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
return t.Write(stream, p, opts)
err = t.Write(stream, p, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
}
return err
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler
if sh != nil {
begin := &stats.Begin{
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
}
defer func() {
if sh != nil {
end := &stats.End{
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
}
}()
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
@ -579,14 +664,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil {
switch err := err.(type) {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
@ -597,20 +682,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
default:
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
}
var inPayload *stats.InPayload
if sh != nil {
inPayload = &stats.InPayload{
RecvTime: time.Now(),
}
return err
}
statusCode := codes.OK
statusDesc := ""
df := func(v interface{}) error {
if inPayload != nil {
inPayload.WireLength = len(req)
}
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
@ -618,7 +712,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
return err
return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
@ -630,6 +724,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if inPayload != nil {
inPayload.Payload = v
inPayload.Data = req
inPayload.Length = len(req)
sh.HandleRPC(stream.Context(), inPayload)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
@ -650,9 +750,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
return err
}
return nil
return Errorf(statusCode, statusDesc)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
@ -677,23 +776,46 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
return t.WriteStatus(stream, statusCode, statusDesc)
errWrite := t.WriteStatus(stream, statusCode, statusDesc)
if statusCode != codes.OK {
return Errorf(statusCode, statusDesc)
}
return errWrite
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler
if sh != nil {
begin := &stats.Begin{
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
}
defer func() {
if sh != nil {
end := &stats.End{
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
}
}()
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
maxMsgSize: s.opts.maxMsgSize,
trInfo: trInfo,
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
maxMsgSize: s.opts.maxMsgSize,
trInfo: trInfo,
statsHandler: sh,
}
if ss.cp != nil {
ss.cbuf = new(bytes.Buffer)
@ -712,15 +834,19 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}
var appErr error
var server interface{}
if srv != nil {
server = srv.server
}
if s.opts.streamInt == nil {
appErr = sd.Handler(srv.server, ss)
appErr = sd.Handler(server, ss)
} else {
info := &StreamServerInfo{
FullMethod: stream.Method(),
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
appErr = s.opts.streamInt(server, ss, info, sd.Handler)
}
if appErr != nil {
if err, ok := appErr.(*rpcError); ok {
@ -744,7 +870,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
ss.mu.Unlock()
}
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
if ss.statusCode != codes.OK {
return Errorf(ss.statusCode, ss.statusDesc)
}
return errWrite
}
@ -759,7 +889,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@ -775,11 +906,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
errDesc := fmt.Sprintf("unknown service %v", service)
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@ -804,7 +940,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
errDesc := fmt.Sprintf("unknown method %v", method)
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()

76
cmd/vendor/google.golang.org/grpc/stats/handlers.go generated vendored Normal file
View File

@ -0,0 +1,76 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package stats
import (
"net"
"golang.org/x/net/context"
)
// ConnTagInfo defines the relevant information needed by connection context tagger.
type ConnTagInfo struct {
// RemoteAddr is the remote address of the corresponding connection.
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// TODO add QOS related fields.
}
// RPCTagInfo defines the relevant information needed by RPC context tagger.
type RPCTagInfo struct {
// FullMethodName is the RPC method in the format of /package.service/method.
FullMethodName string
}
// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
type Handler interface {
// TagRPC can attach some information to the given context.
// The returned context is used in the rest lifetime of the RPC.
TagRPC(context.Context, *RPCTagInfo) context.Context
// HandleRPC processes the RPC stats.
HandleRPC(context.Context, RPCStats)
// TagConn can attach some information to the given context.
// The returned context will be used for stats handling.
// For conn stats handling, the context used in HandleConn for this
// connection will be derived from the context returned.
// For RPC stats handling,
// - On server side, the context used in HandleRPC for all RPCs on this
// connection will be derived from the context returned.
// - On client side, the context is not derived from the context returned.
TagConn(context.Context, *ConnTagInfo) context.Context
// HandleConn processes the Conn stats.
HandleConn(context.Context, ConnStats)
}

223
cmd/vendor/google.golang.org/grpc/stats/stats.go generated vendored Normal file
View File

@ -0,0 +1,223 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Package stats is for collecting and reporting various network and RPC stats.
// This package is for monitoring purpose only. All fields are read-only.
// All APIs are experimental.
package stats // import "google.golang.org/grpc/stats"
import (
"net"
"time"
)
// RPCStats contains stats information about RPCs.
type RPCStats interface {
isRPCStats()
// IsClient returns true if this RPCStats is from client side.
IsClient() bool
}
// Begin contains stats when an RPC begins.
// FailFast are only valid if Client is true.
type Begin struct {
// Client is true if this Begin is from client side.
Client bool
// BeginTime is the time when the RPC begins.
BeginTime time.Time
// FailFast indicates if this RPC is failfast.
FailFast bool
}
// IsClient indicates if this is from client side.
func (s *Begin) IsClient() bool { return s.Client }
func (s *Begin) isRPCStats() {}
// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Client bool
// Payload is the payload with original type.
Payload interface{}
// Data is the serialized message payload.
Data []byte
// Length is the length of uncompressed data.
Length int
// WireLength is the length of data on wire (compressed, signed, encrypted).
WireLength int
// RecvTime is the time when the payload is received.
RecvTime time.Time
}
// IsClient indicates if this is from client side.
func (s *InPayload) IsClient() bool { return s.Client }
func (s *InPayload) isRPCStats() {}
// InHeader contains stats when a header is received.
// FullMethod, addresses and Compression are only valid if Client is false.
type InHeader struct {
// Client is true if this InHeader is from client side.
Client bool
// WireLength is the wire length of header.
WireLength int
// FullMethod is the full RPC method string, i.e., /package.service/method.
FullMethod string
// RemoteAddr is the remote address of the corresponding connection.
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// Compression is the compression algorithm used for the RPC.
Compression string
}
// IsClient indicates if this is from client side.
func (s *InHeader) IsClient() bool { return s.Client }
func (s *InHeader) isRPCStats() {}
// InTrailer contains stats when a trailer is received.
type InTrailer struct {
// Client is true if this InTrailer is from client side.
Client bool
// WireLength is the wire length of trailer.
WireLength int
}
// IsClient indicates if this is from client side.
func (s *InTrailer) IsClient() bool { return s.Client }
func (s *InTrailer) isRPCStats() {}
// OutPayload contains the information for an outgoing payload.
type OutPayload struct {
// Client is true if this OutPayload is from client side.
Client bool
// Payload is the payload with original type.
Payload interface{}
// Data is the serialized message payload.
Data []byte
// Length is the length of uncompressed data.
Length int
// WireLength is the length of data on wire (compressed, signed, encrypted).
WireLength int
// SentTime is the time when the payload is sent.
SentTime time.Time
}
// IsClient indicates if this is from client side.
func (s *OutPayload) IsClient() bool { return s.Client }
func (s *OutPayload) isRPCStats() {}
// OutHeader contains stats when a header is sent.
// FullMethod, addresses and Compression are only valid if Client is true.
type OutHeader struct {
// Client is true if this OutHeader is from client side.
Client bool
// WireLength is the wire length of header.
WireLength int
// FullMethod is the full RPC method string, i.e., /package.service/method.
FullMethod string
// RemoteAddr is the remote address of the corresponding connection.
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// Compression is the compression algorithm used for the RPC.
Compression string
}
// IsClient indicates if this is from client side.
func (s *OutHeader) IsClient() bool { return s.Client }
func (s *OutHeader) isRPCStats() {}
// OutTrailer contains stats when a trailer is sent.
type OutTrailer struct {
// Client is true if this OutTrailer is from client side.
Client bool
// WireLength is the wire length of trailer.
WireLength int
}
// IsClient indicates if this is from client side.
func (s *OutTrailer) IsClient() bool { return s.Client }
func (s *OutTrailer) isRPCStats() {}
// End contains stats when an RPC ends.
type End struct {
// Client is true if this End is from client side.
Client bool
// EndTime is the time when the RPC ends.
EndTime time.Time
// Error is the error just happened. Its type is gRPC error.
Error error
}
// IsClient indicates if this is from client side.
func (s *End) IsClient() bool { return s.Client }
func (s *End) isRPCStats() {}
// ConnStats contains stats information about connections.
type ConnStats interface {
isConnStats()
// IsClient returns true if this ConnStats is from client side.
IsClient() bool
}
// ConnBegin contains the stats of a connection when it is established.
type ConnBegin struct {
// Client is true if this ConnBegin is from client side.
Client bool
}
// IsClient indicates if this is from client side.
func (s *ConnBegin) IsClient() bool { return s.Client }
func (s *ConnBegin) isConnStats() {}
// ConnEnd contains the stats of a connection when it ends.
type ConnEnd struct {
// Client is true if this ConnEnd is from client side.
Client bool
}
// IsClient indicates if this is from client side.
func (s *ConnEnd) IsClient() bool { return s.Client }
func (s *ConnEnd) isConnStats() {}

View File

@ -37,7 +37,6 @@ import (
"bytes"
"errors"
"io"
"math"
"sync"
"time"
@ -45,6 +44,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -97,7 +97,7 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
@ -106,11 +106,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
put func()
t transport.ClientTransport
s *transport.Stream
put func()
cancel context.CancelFunc
)
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
}
}
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
@ -143,6 +150,26 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
sh.HandleRPC(ctx, begin)
}
defer func() {
if err != nil && sh != nil {
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
sh.HandleRPC(ctx, end)
}
}()
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@ -180,12 +207,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
maxMsgSize: cc.dopts.maxMsgSize,
cancel: cancel,
put: put,
t: t,
@ -194,6 +223,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
tracing: EnableTracing,
trInfo: trInfo,
statsCtx: ctx,
statsHandler: cc.dopts.copts.StatsHandler,
}
if cc.dopts.cp != nil {
cs.cbuf = new(bytes.Buffer)
@ -227,16 +259,18 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
maxMsgSize int
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@ -246,6 +280,12 @@ type clientStream struct {
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called.
trInfo traceInfo
// statsCtx keeps the user context for stats handling.
// All stats collection should use the statsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
statsCtx context.Context
statsHandler stats.Handler
}
func (cs *clientStream) Context() context.Context {
@ -274,6 +314,8 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
// TODO Investigate how to signal the stats handling party.
// generate error stats if err != nil && err != io.EOF?
defer func() {
if err != nil {
cs.finish(err)
@ -296,7 +338,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
err = toRPCErr(err)
}()
out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
var outPayload *stats.OutPayload
if cs.statsHandler != nil {
outPayload = &stats.OutPayload{
Client: true,
}
}
out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
defer func() {
if cs.cbuf != nil {
cs.cbuf.Reset()
@ -305,11 +353,37 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil {
return Errorf(codes.Internal, "grpc: %v", err)
}
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
}
return err
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
defer func() {
if err != nil && cs.statsHandler != nil {
// Only generate End if err != nil.
// If err == nil, it's not the last RecvMsg.
// The last RecvMsg gets either an RPC error or io.EOF.
end := &stats.End{
Client: true,
EndTime: time.Now(),
}
if err != io.EOF {
end.Error = toRPCErr(err)
}
cs.statsHandler.HandleRPC(cs.statsCtx, end)
}
}()
var inPayload *stats.InPayload
if cs.statsHandler != nil {
inPayload = &stats.InPayload{
Client: true,
}
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@ -324,11 +398,15 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
if inPayload != nil {
cs.statsHandler.HandleRPC(cs.statsCtx, inPayload)
}
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@ -384,6 +462,11 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
defer func() {
if cs.cancel != nil {
cs.cancel()
}
}()
cs.mu.Lock()
defer cs.mu.Unlock()
for _, o := range cs.opts {
@ -441,6 +524,8 @@ type serverStream struct {
statusDesc string
trInfo *traceInfo
statsHandler stats.Handler
mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
@ -482,7 +567,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
var outPayload *stats.OutPayload
if ss.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
defer func() {
if ss.cbuf != nil {
ss.cbuf.Reset()
@ -495,6 +584,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if outPayload != nil {
outPayload.SentTime = time.Now()
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
}
return nil
}
@ -513,7 +606,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
var inPayload *stats.InPayload
if ss.statsHandler != nil {
inPayload = &stats.InPayload{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
return err
}
@ -522,5 +619,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
if inPayload != nil {
ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
}
return nil
}

54
cmd/vendor/google.golang.org/grpc/tap/tap.go generated vendored Normal file
View File

@ -0,0 +1,54 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Package tap defines the function handles which are executed on the transport
// layer of gRPC-Go and related information. Everything here is EXPERIMENTAL.
package tap
import (
"golang.org/x/net/context"
)
// Info defines the relevant information needed by the handles.
type Info struct {
// FullMethodName is the string of grpc method (in the format of
// /package.service/method).
FullMethodName string
// TODO: More to be added.
}
// ServerInHandle defines the function which runs when a new stream is created
// on the server side. Note that it is executed in the per-connection I/O goroutine(s) instead
// of per-RPC goroutine. Therefore, users should NOT have any blocking/time-consuming
// work in this handle. Otherwise all the RPCs would slow down.
type ServerInHandle func(ctx context.Context, info *Info) (context.Context, error)

View File

@ -35,7 +35,9 @@ package transport
import (
"fmt"
"math"
"sync"
"time"
"golang.org/x/net/http2"
)
@ -44,8 +46,12 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
)
// The following defines various control items which could flow through
@ -111,35 +117,9 @@ func newQuotaPool(q int) *quotaPool {
return qb
}
// add adds n to the available quota and tries to send it on acquire.
func (qb *quotaPool) add(n int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.quota += n
if qb.quota <= 0 {
return
}
select {
case qb.c <- qb.quota:
qb.quota = 0
default:
}
}
// cancel cancels the pending quota sent on acquire, if any.
func (qb *quotaPool) cancel() {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
case n := <-qb.c:
qb.quota += n
default:
}
}
// reset cancels the pending quota sent on acquired, incremented by v and sends
// add cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
func (qb *quotaPool) reset(v int) {
func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
@ -151,6 +131,10 @@ func (qb *quotaPool) reset(v int) {
if qb.quota <= 0 {
return
}
// After the pool has been created, this is the only place that sends on
// the channel. Since mu is held at this point and any quota that was sent
// on the channel has been retrieved, we know that this code will always
// place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0

View File

@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context

View File

@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
@ -49,18 +50,23 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
target string // server name/addr
userAgent string
md interface{}
conn net.Conn // underlying communication channel
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
ctx context.Context
target string // server name/addr
userAgent string
md interface{}
conn net.Conn // underlying communication channel
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
@ -76,6 +82,8 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@ -95,6 +103,13 @@ type http2Client struct {
creds []credentials.PerRPCCredentials
// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
statsHandler stats.Handler
mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
@ -150,6 +165,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
}
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
@ -169,23 +187,31 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
ua := primaryUA
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
target: addr.Addr,
userAgent: ua,
md: addr.Metadata,
conn: conn,
authInfo: authInfo,
ctx: ctx,
target: addr.Addr,
userAgent: opts.UserAgent,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@ -196,8 +222,24 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
maxStreams: defaultMaxStreamsClient,
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
@ -233,6 +275,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
if t.kp.Time != infinity {
go t.keepalive()
}
t.writableChan <- 0
return t, nil
}
@ -270,12 +315,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
@ -313,21 +359,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
if _, ok := err.(StreamError); ok {
t.streamsQuota.add(1)
}
return nil, err
@ -335,9 +378,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
if checkStreamsQuota {
t.streamsQuota.add(1)
}
t.streamsQuota.add(1)
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
@ -347,18 +388,19 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(false, false, [8]byte{})
default:
}
}
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
var reset bool
if !checkStreamsQuota && t.streamsQuota != nil {
reset = true
}
t.mu.Unlock()
if reset {
t.streamsQuota.reset(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
@ -413,6 +455,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
first := true
bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
@ -447,6 +490,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
if t.statsHandler != nil {
outHeader := &stats.OutHeader{
Client: true,
WireLength: bufLen,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
}
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
}
t.writableChan <- 0
return s, nil
}
@ -454,15 +508,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
if t.streamsQuota != nil {
updateStreams = true
}
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
@ -471,10 +521,27 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
// rstStream is true in case the stream is being closed at the client-side
// and the server needs to be intimated about it by sending a RST_STREAM
// frame.
// To make sure this frame is written to the wire before the headers of the
// next stream waiting for streamsQuota, we add to streamsQuota pool only
// after having acquired the writableChan to send RST_STREAM out (look at
// the controller() routine).
var rstStream bool
var rstError http2.ErrCode
defer func() {
// In case, the client doesn't have to send RST_STREAM to server
// we can safely add back to streamsQuota pool now.
if !rstStream {
t.streamsQuota.add(1)
return
}
t.controlBuf.put(&resetStream{s.id, rstError})
}()
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
@ -490,8 +557,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
if _, ok := err.(StreamError); ok {
rstStream = true
rstError = http2.ErrCodeCancel
}
}
@ -525,6 +593,12 @@ func (t *http2Client) Close() (err error) {
s.mu.Unlock()
s.write(recvMsg{err: ErrConnClosing})
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return
}
@ -582,19 +656,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
}
if sq < size {
@ -704,7 +773,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
func (t *http2Client) handleData(f *http2.DataFrame) {
size := len(f.Data())
size := f.Header().Length
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
@ -718,6 +787,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@ -731,19 +805,27 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
s.rstStream = true
s.rstError = http2.ErrCodeFlowControl
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
}
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
@ -874,6 +956,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
var isHeader bool
defer func() {
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
}
}
}()
s.mu.Lock()
if !endStream {
@ -885,6 +985,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
close(s.headerChan)
s.headerDone = true
isHeader = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
@ -925,6 +1026,7 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
@ -935,6 +1037,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
@ -986,21 +1089,15 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
reset := t.streamsQuota != nil
if !reset {
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
}
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
t.streamsQuota.reset(int(s.Val) - ms)
}
t.streamsQuota.add(int(s.Val) - ms)
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
@ -1028,6 +1125,12 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
@ -1047,6 +1150,61 @@ func (t *http2Client) controller() {
}
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.shutdownChan:
return
}
} else {
t.mu.Unlock()
// Send ping.
t.controlBuf.put(p)
}
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
}
}
func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}

View File

@ -50,6 +50,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
conn net.Conn
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@ -82,6 +88,8 @@ type http2Server struct {
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
stats stats.Handler
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
@ -91,12 +99,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@ -121,12 +130,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
conn: conn,
authInfo: authInfo,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@ -135,6 +148,15 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
shutdownChan: make(chan struct{}),
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
t.writableChan <- 0
@ -142,7 +164,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@ -168,12 +190,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(context.TODO())
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@ -195,6 +217,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
// TODO: Log the real error.
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@ -218,13 +252,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
func (t *http2Server) HandleStreams(handle func(*Stream)) {
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@ -279,7 +326,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle) {
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
@ -334,7 +381,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
}
func (t *http2Server) handleData(f *http2.DataFrame) {
size := len(f.Data())
size := f.Header().Length
if err := t.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
@ -349,6 +396,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@ -364,13 +416,20 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
@ -492,9 +551,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
if t.stats != nil {
outHeader := &stats.OutHeader{
WireLength: bufLen,
}
t.stats.HandleRPC(s.Context(), outHeader)
}
t.writableChan <- 0
return nil
}
@ -547,10 +613,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
if t.stats != nil {
outTrailer := &stats.OutTrailer{
WireLength: bufLen,
}
t.stats.HandleRPC(s.Context(), outTrailer)
}
t.closeStream(s)
t.writableChan <- 0
return nil
@ -579,19 +652,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
t.sendQuotaPool.cancel()
}
return err
}
if sq < size {
@ -659,7 +727,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@ -736,6 +804,10 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
if t.stats != nil {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
return
}
@ -767,7 +839,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
return t.remoteAddr
}
func (t *http2Server) Drain() {

View File

@ -52,8 +52,6 @@ import (
)
const (
// The primary user agent
primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues

View File

@ -45,10 +45,13 @@ import (
"sync"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
@ -167,6 +170,11 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
// clientStatsCtx keeps the user context for stats handling.
// It's only valid on client side. Server side stats context is same as s.ctx.
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@ -207,6 +215,11 @@ type Stream struct {
// the status received from the server.
statusCode codes.Code
statusDesc string
// rstStream indicates whether a RST_STREAM frame needs to be sent
// to the server to signify that this stream is closing.
rstStream bool
// rstError is the error that needs to be sent along with the RST_STREAM frame.
rstError http2.ErrCode
}
// RecvCompress returns the compression algorithm applied to the inbound
@ -266,11 +279,6 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
// TraceContext recreates the context of s with a trace.Trace.
func (s *Stream) TraceContext(tr trace.Trace) {
s.ctx = trace.NewContext(s.ctx, tr)
}
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@ -330,6 +338,12 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return
}
// GoString is implemented by Stream so context.String() won't
// race when printing %#v.
func (s *Stream) GoString() string {
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
}
// The key to save transport.Stream in the context.
type streamKey struct{}
@ -355,22 +369,39 @@ const (
draining
)
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
return newHTTP2Server(conn, maxStreams, authInfo)
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Authority is the :authority pseudo-header to use. This field has no effect if
// TransportCredentials is set.
Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
// TargetInfo contains the information of the target such as network address and metadata.
@ -466,7 +497,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
HandleStreams(func(*Stream))
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
@ -552,7 +583,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.

9
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: c8d757181c4950cd9c7d6f2091c50da1fa7ffbed6b66ca6c6a34495c0e5c84ab
updated: 2017-03-06T19:45:05.355925876-08:00
hash: d546c79ab7f1bcc7b048995624e4353713fe1b5fad5d69a50202e309fe9dd2fc
updated: 2017-04-07T14:31:10.872688463-07:00
imports:
- name: github.com/beorn7/perks
version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
@ -137,15 +137,18 @@ imports:
subpackages:
- rate
- name: google.golang.org/grpc
version: 777daa17ff9b5daef1cfdf915088a2ada3332bf0
version: 8050b9cbc271307e5a716a9d782803d09b0d6f2d
subpackages:
- codes
- credentials
- grpclog
- internal
- keepalive
- metadata
- naming
- peer
- stats
- tap
- transport
- name: gopkg.in/cheggaaa/pb.v1
version: 226d21d43a305fac52b3a104ef83e721b15275e0

View File

@ -97,7 +97,7 @@ import:
subpackages:
- rate
- package: google.golang.org/grpc
version: v1.0.4
version: v1.2.1
subpackages:
- codes
- credentials