mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Clientv3 (retry) logs should use the configured logger.
clientv3 logs (especially tests) were poluted with unattributed to testing.T log lines: ``` {"level":"warn","ts":"2021-04-29T12:42:11.055+0200","logger":"etcd-client","caller":"v3/retry_interceptor.go:64","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc0000fafc0/#initially=[unix://localhost:m10]","attempt":0,"error":"rpc error: code = ResourceExhausted desc = etcdserver: mvcc: database space exceeded"} ``` The reasons were 2 fold: - Interceptors were copying logger before "WithLogger" could modify it. - We were not propagating the loggers in a few testing contexts.
This commit is contained in:
parent
ed4a87d541
commit
bc8d3f6639
@ -215,8 +215,8 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
|
|||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
|
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
|
||||||
// Streams that are safe to retry are enabled individually.
|
// Streams that are safe to retry are enabled individually.
|
||||||
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
|
grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
|
||||||
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
|
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
|
||||||
)
|
)
|
||||||
|
|
||||||
return opts, nil
|
return opts, nil
|
||||||
|
@ -35,7 +35,7 @@ import (
|
|||||||
//
|
//
|
||||||
// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
|
// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
|
||||||
// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
|
// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
|
||||||
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
|
func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
|
||||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
ctx = withVersion(ctx)
|
ctx = withVersion(ctx)
|
||||||
@ -50,7 +50,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
|||||||
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
|
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logger.Debug(
|
c.GetLogger().Debug(
|
||||||
"retrying of unary invoker",
|
"retrying of unary invoker",
|
||||||
zap.String("target", cc.Target()),
|
zap.String("target", cc.Target()),
|
||||||
zap.Uint("attempt", attempt),
|
zap.Uint("attempt", attempt),
|
||||||
@ -59,7 +59,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
|||||||
if lastErr == nil {
|
if lastErr == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logger.Warn(
|
c.GetLogger().Warn(
|
||||||
"retrying of unary invoker failed",
|
"retrying of unary invoker failed",
|
||||||
zap.String("target", cc.Target()),
|
zap.String("target", cc.Target()),
|
||||||
zap.Uint("attempt", attempt),
|
zap.Uint("attempt", attempt),
|
||||||
@ -82,7 +82,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
|||||||
|
|
||||||
gterr := c.getToken(ctx)
|
gterr := c.getToken(ctx)
|
||||||
if gterr != nil {
|
if gterr != nil {
|
||||||
logger.Warn(
|
c.GetLogger().Warn(
|
||||||
"retrying of unary invoker failed to fetch new auth token",
|
"retrying of unary invoker failed to fetch new auth token",
|
||||||
zap.String("target", cc.Target()),
|
zap.String("target", cc.Target()),
|
||||||
zap.Error(gterr),
|
zap.Error(gterr),
|
||||||
@ -107,7 +107,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
|||||||
// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
|
// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
|
||||||
// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
|
// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
|
||||||
// BidiStreams), the retry interceptor will fail the call.
|
// BidiStreams), the retry interceptor will fail the call.
|
||||||
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
|
func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor {
|
||||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
ctx = withVersion(ctx)
|
ctx = withVersion(ctx)
|
||||||
@ -117,7 +117,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
|
|||||||
// equal to c.Username != "" && c.Password != ""
|
// equal to c.Username != "" && c.Password != ""
|
||||||
err := c.getToken(ctx)
|
err := c.getToken(ctx)
|
||||||
if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
|
if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
|
||||||
logger.Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
|
c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -132,7 +132,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
|
|||||||
}
|
}
|
||||||
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
|
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("streamer failed to create ClientStream", zap.Error(err))
|
c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err))
|
||||||
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||||
}
|
}
|
||||||
retryingStreamer := &serverStreamingRetryingStream{
|
retryingStreamer := &serverStreamingRetryingStream{
|
||||||
|
@ -56,6 +56,8 @@ func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath strin
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
|
cli = cli.WithLogger(lg.Named("client"))
|
||||||
|
|
||||||
partpath := dbPath + ".part"
|
partpath := dbPath + ".part"
|
||||||
defer os.RemoveAll(partpath)
|
defer os.RemoveAll(partpath)
|
||||||
|
|
||||||
|
@ -120,6 +120,7 @@ func epHealthCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
hch <- epHealth{Ep: ep, Health: false, Error: err.Error()}
|
hch <- epHealth{Ep: ep, Health: false, Error: err.Error()}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cli = cli.WithLogger(lg.Named("client"))
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
// get a random key. As long as we can get the response without an error, the
|
// get a random key. As long as we can get the response without an error, the
|
||||||
// endpoint is health.
|
// endpoint is health.
|
||||||
|
@ -288,7 +288,7 @@ func mustNewClient(lg *zap.Logger) *clientv3.Client {
|
|||||||
fmt.Fprintln(os.Stderr, err)
|
fmt.Fprintln(os.Stderr, err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
return client
|
return client.WithLogger(lg.Named("client"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
|
func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
|
||||||
@ -304,7 +304,7 @@ func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
|
lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
|
||||||
return client
|
return client.WithLogger(lg.Named("client"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
|
func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user