Merge pull request #12908 from ptabor/20210429-client-retry-logging

Clientv3 (retry interceptor) logs should use the configured logger
This commit is contained in:
Piotr Tabor 2021-04-29 19:25:04 +02:00 committed by GitHub
commit 451f65d661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 117 additions and 83 deletions

View File

@ -4,8 +4,9 @@
package transport package transport
import ( import (
"golang.org/x/sys/unix"
"syscall" "syscall"
"golang.org/x/sys/unix"
) )
func setReusePort(network, address string, conn syscall.RawConn) error { func setReusePort(network, address string, conn syscall.RawConn) error {

View File

@ -15,9 +15,10 @@
package types package types
import ( import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
"reflect" "reflect"
"testing" "testing"
"go.etcd.io/etcd/client/pkg/v3/testutil"
) )
func TestParseInitialCluster(t *testing.T) { func TestParseInitialCluster(t *testing.T) {

View File

@ -15,10 +15,11 @@
package client package client
import ( import (
"github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"strconv" "strconv"
"unsafe" "unsafe"
"github.com/json-iterator/go"
"github.com/modern-go/reflect2"
) )
type customNumberExtension struct { type customNumberExtension struct {

View File

@ -19,12 +19,13 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"go.etcd.io/etcd/client/pkg/v3/pathutil"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"go.etcd.io/etcd/client/pkg/v3/pathutil"
) )
const ( const (

View File

@ -215,8 +215,8 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
opts = append(opts, opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams. // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually. // Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)), grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)), grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
) )
return opts, nil return opts, nil

View File

@ -23,10 +23,19 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/testutil"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func NewClient(t *testing.T, cfg Config) (*Client, error) {
client, err := New(cfg)
if err != nil {
return nil, err
}
return client.WithLogger(zaptest.NewLogger(t)), nil
}
func TestDialCancel(t *testing.T) { func TestDialCancel(t *testing.T) {
testutil.BeforeTest(t) testutil.BeforeTest(t)
@ -41,7 +50,7 @@ func TestDialCancel(t *testing.T) {
cfg := Config{ cfg := Config{
Endpoints: []string{ep}, Endpoints: []string{ep},
DialTimeout: 30 * time.Second} DialTimeout: 30 * time.Second}
c, err := New(cfg) c, err := NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -104,7 +113,7 @@ func TestDialTimeout(t *testing.T) {
donec := make(chan error, 1) donec := make(chan error, 1)
go func(cfg Config) { go func(cfg Config) {
// without timeout, dial continues forever on ipv4 black hole // without timeout, dial continues forever on ipv4 black hole
c, err := New(cfg) c, err := NewClient(t, cfg)
if c != nil || err == nil { if c != nil || err == nil {
t.Errorf("#%d: new client should fail", i) t.Errorf("#%d: new client should fail", i)
} }
@ -132,7 +141,7 @@ func TestDialTimeout(t *testing.T) {
func TestDialNoTimeout(t *testing.T) { func TestDialNoTimeout(t *testing.T) {
cfg := Config{Endpoints: []string{"127.0.0.1:12345"}} cfg := Config{Endpoints: []string{"127.0.0.1:12345"}}
c, err := New(cfg) c, err := NewClient(t, cfg)
if c == nil || err != nil { if c == nil || err != nil {
t.Fatalf("new client with DialNoWait should succeed, got %v", err) t.Fatalf("new client with DialNoWait should succeed, got %v", err)
} }

View File

@ -35,7 +35,7 @@ import (
// //
// The default configuration of the interceptor is to not retry *at all*. This behaviour can be // The default configuration of the interceptor is to not retry *at all*. This behaviour can be
// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = withVersion(ctx) ctx = withVersion(ctx)
@ -50,7 +50,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil { if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
return err return err
} }
logger.Debug( c.GetLogger().Debug(
"retrying of unary invoker", "retrying of unary invoker",
zap.String("target", cc.Target()), zap.String("target", cc.Target()),
zap.Uint("attempt", attempt), zap.Uint("attempt", attempt),
@ -59,7 +59,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
if lastErr == nil { if lastErr == nil {
return nil return nil
} }
logger.Warn( c.GetLogger().Warn(
"retrying of unary invoker failed", "retrying of unary invoker failed",
zap.String("target", cc.Target()), zap.String("target", cc.Target()),
zap.Uint("attempt", attempt), zap.Uint("attempt", attempt),
@ -82,7 +82,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
gterr := c.getToken(ctx) gterr := c.getToken(ctx)
if gterr != nil { if gterr != nil {
logger.Warn( c.GetLogger().Warn(
"retrying of unary invoker failed to fetch new auth token", "retrying of unary invoker failed to fetch new auth token",
zap.String("target", cc.Target()), zap.String("target", cc.Target()),
zap.Error(gterr), zap.Error(gterr),
@ -107,7 +107,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs // Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, // to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
// BidiStreams), the retry interceptor will fail the call. // BidiStreams), the retry interceptor will fail the call.
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = withVersion(ctx) ctx = withVersion(ctx)
@ -117,7 +117,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
// equal to c.Username != "" && c.Password != "" // equal to c.Username != "" && c.Password != ""
err := c.getToken(ctx) err := c.getToken(ctx)
if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled { if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
logger.Error("clientv3/retry_interceptor: getToken failed", zap.Error(err)) c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
return nil, err return nil, err
} }
} }
@ -132,7 +132,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
} }
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
if err != nil { if err != nil {
logger.Error("streamer failed to create ClientStream", zap.Error(err)) c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err))
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable? return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
} }
retryingStreamer := &serverStreamingRetryingStream{ retryingStreamer := &serverStreamingRetryingStream{

View File

@ -56,6 +56,8 @@ func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath strin
} }
defer cli.Close() defer cli.Close()
cli = cli.WithLogger(lg.Named("client"))
partpath := dbPath + ".part" partpath := dbPath + ".part"
defer os.RemoveAll(partpath) defer os.RemoveAll(partpath)

View File

@ -26,14 +26,15 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"time" "time"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
) )
type node struct { type node struct {

View File

@ -120,6 +120,7 @@ func epHealthCommandFunc(cmd *cobra.Command, args []string) {
hch <- epHealth{Ep: ep, Health: false, Error: err.Error()} hch <- epHealth{Ep: ep, Health: false, Error: err.Error()}
return return
} }
cli = cli.WithLogger(lg.Named("client"))
st := time.Now() st := time.Now()
// get a random key. As long as we can get the response without an error, the // get a random key. As long as we can get the response without an error, the
// endpoint is health. // endpoint is health.

View File

@ -18,11 +18,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/bgentry/speakeasy"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/bgentry/speakeasy"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"

View File

@ -15,8 +15,9 @@
package auth package auth
import ( import (
"github.com/prometheus/client_golang/prometheus"
"sync" "sync"
"github.com/prometheus/client_golang/prometheus"
) )
var ( var (

View File

@ -288,7 +288,7 @@ func mustNewClient(lg *zap.Logger) *clientv3.Client {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
os.Exit(1) os.Exit(1)
} }
return client return client.WithLogger(lg.Named("client"))
} }
func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client { func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client {
@ -304,7 +304,7 @@ func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client
os.Exit(1) os.Exit(1)
} }
lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL)) lg.Info("create proxy client", zap.String("grpcProxyAdvertiseClientURL", grpcProxyAdvertiseClientURL))
return client return client.WithLogger(lg.Named("client"))
} }
func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) { func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) {

View File

@ -15,11 +15,10 @@
package v2http package v2http
import ( import (
"net/http"
"strconv" "strconv"
"time" "time"
"net/http"
"go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error" "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2http/httptypes" "go.etcd.io/etcd/server/v3/etcdserver/api/v2http/httptypes"

View File

@ -17,15 +17,16 @@ package etcdserver
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
"time" "time"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
) )
func TestNewRaftLogger(t *testing.T) { func TestNewRaftLogger(t *testing.T) {

View File

@ -23,12 +23,13 @@ import (
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/tests/v3/integration"
) )
func TestResumeElection(t *testing.T) { func TestResumeElection(t *testing.T) {
const prefix = "/resume-election/" const prefix = "/resume-election/"
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -20,10 +20,11 @@ import (
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/tests/v3/integration"
) )
func TestMutexLockSessionExpired(t *testing.T) { func TestMutexLockSessionExpired(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -59,7 +59,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// then we can reduce 3s to 1s. // then we can reduce 3s to 1s.
timeout := pingInterval + integration.RequestWaitTimeout timeout := pingInterval + integration.RequestWaitTimeout
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -180,7 +180,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
DialTimeout: 1 * time.Second, DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -56,7 +56,7 @@ func TestDialTLSExpired(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// expect remote errors "tls: bad certificate" // expect remote errors "tls: bad certificate"
_, err = clientv3.New(clientv3.Config{ _, err = integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: 3 * time.Second, DialTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
@ -74,7 +74,7 @@ func TestDialTLSNoConfig(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo, SkipCreatingClient: true}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo, SkipCreatingClient: true})
defer clus.Terminate(t) defer clus.Terminate(t)
// expect "signed by unknown authority" // expect "signed by unknown authority"
c, err := clientv3.New(clientv3.Config{ c, err := integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: time.Second, DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
@ -117,7 +117,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
DialTimeout: 1 * time.Second, DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -175,7 +175,7 @@ func TestRejectOldCluster(t *testing.T) {
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
RejectOldCluster: true, RejectOldCluster: true,
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -120,7 +120,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
DialTimeout: 3 * time.Second, DialTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -174,7 +174,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
timeout := 3 * clus.Members[(lead+1)%2].ServerConfig.ReqTimeout() timeout := 3 * clus.Members[(lead+1)%2].ServerConfig.ReqTimeout()
cli, err := clientv3.New(clientv3.Config{ cli, err := integration.NewClient(t, clientv3.Config{
Endpoints: []string{eps[(lead+1)%2]}, Endpoints: []string{eps[(lead+1)%2]},
DialTimeout: 2 * time.Second, DialTimeout: 2 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
@ -232,7 +232,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
} }
// pin eps[target] // pin eps[target]
watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) watchCli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[target]}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -291,7 +291,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -42,7 +42,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
lead := clus.WaitLeader(t) lead := clus.WaitLeader(t)
// pin eps[lead] // pin eps[lead]
watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[lead]}}) watchCli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -88,7 +88,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
clus.Members[lead].Terminate(t) clus.Members[lead].Terminate(t)
// writes to eps[lead+1] // writes to eps[lead+1]
putCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[(lead+1)%3]}}) putCli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[(lead+1)%3]}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -152,7 +152,7 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
// pin eps[0] // pin eps[0]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -210,7 +210,7 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
// pin eps[0] // pin eps[0]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
if err != nil { if err != nil {
t.Errorf("failed to create client: %v", err) t.Errorf("failed to create client: %v", err)
} }
@ -293,7 +293,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
} }
// pin eps[target] // pin eps[target]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[target]}})
if err != nil { if err != nil {
t.Errorf("failed to create client: %v", err) t.Errorf("failed to create client: %v", err)
} }

View File

@ -1034,7 +1034,7 @@ func TestKVForLearner(t *testing.T) {
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
// this client only has endpoint of the learner member // this client only has endpoint of the learner member
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatalf("failed to create clientv3: %v", err) t.Fatalf("failed to create clientv3: %v", err)
} }
@ -1106,7 +1106,7 @@ func TestBalancerSupportLearner(t *testing.T) {
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatalf("failed to create clientv3: %v", err) t.Fatalf("failed to create clientv3: %v", err)
} }

View File

@ -209,7 +209,7 @@ func TestMaintenanceStatus(t *testing.T) {
eps[i] = clus.Members[i].GRPCAddr() eps[i] = clus.Members[i].GRPCAddr()
} }
cli, err := clientv3.New(clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -81,7 +81,7 @@ func TestV3ClientMetrics(t *testing.T) {
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
}, },
} }
cli, cerr := clientv3.New(cfg) cli, cerr := integration.NewClient(t, cfg)
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }

View File

@ -40,7 +40,7 @@ func TestDetectKvOrderViolation(t *testing.T) {
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCAddr(),
}, },
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -107,7 +107,7 @@ func TestDetectTxnOrderViolation(t *testing.T) {
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCAddr(),
}, },
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -34,7 +34,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCAddr(),
} }
cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}} cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}}
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -91,7 +91,7 @@ func TestUnresolvableOrderViolation(t *testing.T) {
clus.Members[4].GRPCAddr(), clus.Members[4].GRPCAddr(),
}, },
} }
cli, err := clientv3.New(cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -83,7 +83,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
} }
ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -75,16 +75,16 @@ func TestUserErrorAuth(t *testing.T) {
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
cfg.Username, cfg.Password = "wrong-id", "123" cfg.Username, cfg.Password = "wrong-id", "123"
if _, err := clientv3.New(cfg); err != rpctypes.ErrAuthFailed { if _, err := integration.NewClient(t, cfg); err != rpctypes.ErrAuthFailed {
t.Fatalf("expected %v, got %v", rpctypes.ErrAuthFailed, err) t.Fatalf("expected %v, got %v", rpctypes.ErrAuthFailed, err)
} }
cfg.Username, cfg.Password = "root", "wrong-pass" cfg.Username, cfg.Password = "root", "wrong-pass"
if _, err := clientv3.New(cfg); err != rpctypes.ErrAuthFailed { if _, err := integration.NewClient(t, cfg); err != rpctypes.ErrAuthFailed {
t.Fatalf("expected %v, got %v", rpctypes.ErrAuthFailed, err) t.Fatalf("expected %v, got %v", rpctypes.ErrAuthFailed, err)
} }
cfg.Username, cfg.Password = "root", "123" cfg.Username, cfg.Password = "root", "123"
authed, err := clientv3.New(cfg) authed, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -135,7 +135,7 @@ func TestGetTokenWithoutAuth(t *testing.T) {
Password: "123", Password: "123",
} }
client, err = clientv3.New(cfg) client, err = integration.NewClient(t, cfg)
if err == nil { if err == nil {
defer client.Close() defer client.Close()
} }

View File

@ -35,6 +35,7 @@ import (
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/tests/v3/integration" "go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap/zaptest"
) )
var ( var (
@ -160,12 +161,14 @@ func testEmbedEtcdGracefulStop(t *testing.T, secure bool) {
t.Fatal(err) t.Fatal(err)
} }
} }
cli, err := clientv3.New(clientCfg) cli, err := integration.NewClient(t, clientCfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer cli.Close() defer cli.Close()
cli = cli.WithLogger(zaptest.NewLogger(t))
// open watch connection // open watch connection
cli.Watch(context.Background(), "foo") cli.Watch(context.Background(), "foo")

View File

@ -42,7 +42,7 @@ func TestClusterProxyMemberList(t *testing.T) {
Endpoints: []string{cts.caddr}, Endpoints: []string{cts.caddr},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
} }
client, err := clientv3.New(cfg) client, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatalf("err %v, want nil", err) t.Fatalf("err %v, want nil", err)
} }
@ -94,7 +94,7 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl
Endpoints: endpoints, Endpoints: endpoints,
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
} }
client, err := clientv3.New(cfg) client, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -42,7 +42,7 @@ func TestKVProxyRange(t *testing.T) {
Endpoints: []string{kvts.l.Addr().String()}, Endpoints: []string{kvts.l.Addr().String()},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
} }
client, err := clientv3.New(cfg) client, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)
} }
@ -71,7 +71,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
Endpoints: endpoints, Endpoints: endpoints,
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
} }
client, err := clientv3.New(cfg) client, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -48,7 +48,7 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
// wait for health interval + leader election // wait for health interval + leader election
time.Sleep(etcdserver.HealthInterval + 2*time.Second) time.Sleep(etcdserver.HealthInterval + 2*time.Second)
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[0].String()}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{cURLs[0].String()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -88,7 +88,7 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
t.Fatalf("failed to start the newly added etcd member") t.Fatalf("failed to start the newly added etcd member")
} }
cli2, err := clientv3.New(clientv3.Config{Endpoints: []string{newCURLs[0].String()}}) cli2, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{newCURLs[0].String()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -82,7 +82,7 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
} }
var cli *clientv3.Client var cli *clientv3.Client
cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) cli, err = integration.NewClient(t, clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -119,7 +119,7 @@ func TestSnapshotV3RestoreMulti(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
for i := 0; i < clusterN; i++ { for i := 0; i < clusterN; i++ {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{cURLs[i].String()}})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -194,7 +194,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
} }
ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
cli, err := clientv3.New(ccfg) cli, err := integration.NewClient(t, ccfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -21,6 +21,7 @@ import (
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable" grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
"go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/verify"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -70,3 +71,11 @@ func NewEmbedConfig(t testing.TB, name string) *embed.Config {
cfg.Dir = t.TempDir() cfg.Dir = t.TempDir()
return cfg return cfg
} }
func NewClient(t testing.TB, cfg clientv3.Config) (*clientv3.Client, error) {
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return client.WithLogger(zaptest.NewLogger(t)), nil
}

View File

@ -54,7 +54,7 @@ func testCreateKV(t testing.TB, endpoints []string) {
//{key: "hello", value: "3", unique: true, wantKeyMatch: false}, //{key: "hello", value: "3", unique: true, wantKeyMatch: false},
} }
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: endpoints})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -102,7 +102,7 @@ func testSetKV(t testing.TB, endpoints []string) {
{key: "/sdir/set", value: "4", wantIndexMatch: false}, {key: "/sdir/set", value: "4", wantIndexMatch: false},
} }
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: endpoints})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -138,7 +138,7 @@ func testCreateSetDir(t testing.TB, endpoints []string) {
{dir: "/ddir/1/2/3"}, {dir: "/ddir/1/2/3"},
} }
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: endpoints})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -55,7 +55,7 @@ func TestV3AuthTokenWithDisable(t *testing.T) {
authSetupRoot(t, toGRPC(clus.Client(0)).Auth) authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"}) c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }
@ -127,7 +127,7 @@ func testV3AuthWithLeaseRevokeWithRoot(t *testing.T, ccfg ClusterConfig) {
api := toGRPC(clus.Client(0)) api := toGRPC(clus.Client(0))
authSetupRoot(t, api.Auth) authSetupRoot(t, api.Auth)
rootc, cerr := clientv3.New(clientv3.Config{ rootc, cerr := NewClient(t, clientv3.Config{
Endpoints: clus.Client(0).Endpoints(), Endpoints: clus.Client(0).Endpoints(),
Username: "root", Username: "root",
Password: "123", Password: "123",
@ -194,7 +194,7 @@ func TestV3AuthWithLeaseRevoke(t *testing.T) {
authSetupRoot(t, toGRPC(clus.Client(0)).Auth) authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
rootc, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"}) rootc, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }
@ -211,7 +211,7 @@ func TestV3AuthWithLeaseRevoke(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
userc, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"}) userc, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }
@ -247,13 +247,13 @@ func TestV3AuthWithLeaseAttach(t *testing.T) {
authSetupRoot(t, toGRPC(clus.Client(0)).Auth) authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
user1c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"}) user1c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }
defer user1c.Close() defer user1c.Close()
user2c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user2", Password: "user2-123"}) user2c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user2", Password: "user2-123"})
if cerr != nil { if cerr != nil {
t.Fatal(cerr) t.Fatal(cerr)
} }
@ -364,7 +364,7 @@ func TestV3AuthOldRevConcurrent(t *testing.T) {
authSetupRoot(t, toGRPC(clus.Client(0)).Auth) authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := clientv3.New(clientv3.Config{ c, cerr := NewClient(t, clientv3.Config{
Endpoints: clus.Client(0).Endpoints(), Endpoints: clus.Client(0).Endpoints(),
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
Username: "root", Username: "root",

View File

@ -1758,7 +1758,7 @@ func testTLSReload(
t.Log(err) t.Log(err)
continue continue
} }
cli, cerr := clientv3.New(clientv3.Config{ cli, cerr := NewClient(t, clientv3.Config{
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: time.Second, DialTimeout: time.Second,
@ -1793,7 +1793,7 @@ func testTLSReload(
if terr != nil { if terr != nil {
t.Fatal(terr) t.Fatal(terr)
} }
cl, cerr := clientv3.New(clientv3.Config{ cl, cerr := NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
TLS: tls, TLS: tls,

View File

@ -54,7 +54,7 @@ func testTLSCipherSuites(t *testing.T, valid bool) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
cli, cerr := clientv3.New(clientv3.Config{ cli, cerr := NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: time.Second, DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},

View File

@ -17,9 +17,10 @@ package main
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"go.etcd.io/etcd/api/v3/authpb"
"path/filepath" "path/filepath"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc"

View File

@ -31,8 +31,8 @@ import (
_ "github.com/hexfusion/schwag" _ "github.com/hexfusion/schwag"
_ "github.com/mdempsky/unconvert" _ "github.com/mdempsky/unconvert"
_ "github.com/mgechev/revive" _ "github.com/mgechev/revive"
_ "github.com/mikefarah/yq/v3"
_ "go.etcd.io/protodoc" _ "go.etcd.io/protodoc"
_ "honnef.co/go/tools/cmd/staticcheck" _ "honnef.co/go/tools/cmd/staticcheck"
_ "mvdan.cc/unparam" _ "mvdan.cc/unparam"
_ "github.com/mikefarah/yq/v3"
) )