From 810f489017ab94bb34062fed0e93cbc03cd5105d Mon Sep 17 00:00:00 2001 From: Lili Cosic Date: Mon, 26 Jul 2021 15:06:03 +0200 Subject: [PATCH 1/6] server: Add sampling rate to distributed tracing ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans. Defaults to 0. --- server/embed/config.go | 10 +++ server/embed/config_tracing.go | 111 ++++++++++++++++++++++++++++ server/embed/config_tracing_test.go | 83 +++++++++++++++++++++ server/embed/etcd.go | 62 ++-------------- server/etcdmain/config.go | 1 + server/etcdmain/help.go | 2 + 6 files changed, 212 insertions(+), 57 deletions(-) create mode 100644 server/embed/config_tracing.go create mode 100644 server/embed/config_tracing_test.go diff --git a/server/embed/config.go b/server/embed/config.go index fb4f9aee1..dcb28d9d1 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -348,6 +348,9 @@ type Config struct { // that exist at the same time. // Can only be used if ExperimentalEnableDistributedTracing is true. ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"` + // ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans. + // Defaults to 0. + ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"` // Logger is logger options: currently only supports "zap". // "capnslog" is removed in v3.5. @@ -681,6 +684,13 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) } + // Validate distributed tracing configuration but only if enabled. + if cfg.ExperimentalEnableDistributedTracing { + if err := validateTracingConfig(cfg.ExperimentalDistributedTracingSamplingRatePerMillion); err != nil { + return fmt.Errorf("distributed tracing configurition is not valid: (%v)", err) + } + } + return nil } diff --git a/server/embed/config_tracing.go b/server/embed/config_tracing.go new file mode 100644 index 000000000..b3a5279ce --- /dev/null +++ b/server/embed/config_tracing.go @@ -0,0 +1,111 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "context" + "fmt" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/semconv" + "go.uber.org/zap" +) + +const maxSamplingRatePerMillion = 1000000 + +func validateTracingConfig(samplingRate int) error { + if samplingRate < 0 { + return fmt.Errorf("tracing sampling rate must be positive") + } + if samplingRate > maxSamplingRatePerMillion { + return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion) + } + + return nil +} + +func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) { + exporter, err = otlp.NewExporter(ctx, + otlpgrpc.NewDriver( + otlpgrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress), + otlpgrpc.WithInsecure(), + )) + if err != nil { + return nil, nil, err + } + + res := resource.NewWithAttributes( + semconv.ServiceNameKey.String(cfg.ExperimentalDistributedTracingServiceName), + ) + + if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil { + // Merge resources into a new + // resource in case of duplicates. + res = resource.Merge(res, resWithIDKey) + } + + options = append(options, + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ), + otelgrpc.WithTracerProvider( + tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exporter), + tracesdk.WithResource(res), + tracesdk.WithSampler( + tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)), + ), + ), + ), + ) + + cfg.logger.Debug( + "distributed tracing enabled", + zap.String("address", cfg.ExperimentalDistributedTracingAddress), + zap.String("service-name", cfg.ExperimentalDistributedTracingServiceName), + zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID), + zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion), + ) + + return exporter, options, err +} + +func determineSampler(samplingRate int) tracesdk.Sampler { + sampler := tracesdk.NeverSample() + if samplingRate == 0 { + return sampler + } + return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion)) +} + +// As Tracing service Instance ID must be unique, it should +// never use the empty default string value, it's set if +// if it's a non empty string. +func determineResourceWithIDKey(serviceInstanceID string) *resource.Resource { + if serviceInstanceID != "" { + return resource.NewWithAttributes( + (semconv.ServiceInstanceIDKey.String(serviceInstanceID)), + ) + } + return nil +} diff --git a/server/embed/config_tracing_test.go b/server/embed/config_tracing_test.go new file mode 100644 index 000000000..0abbe4d1d --- /dev/null +++ b/server/embed/config_tracing_test.go @@ -0,0 +1,83 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "testing" +) + +const neverSampleDescription = "AlwaysOffSampler" + +func TestDetermineSampler(t *testing.T) { + tests := []struct { + name string + sampleRate int + wantSamplerDescription string + }{ + { + name: "sample rate is disabled", + sampleRate: 0, + wantSamplerDescription: neverSampleDescription, + }, + { + name: "sample rate is 100", + sampleRate: 100, + wantSamplerDescription: "TraceIDRatioBased{0.0001}", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sampler := determineSampler(tc.sampleRate) + if tc.wantSamplerDescription != sampler.Description() { + t.Errorf("tracing sampler was not as expected; expected sampler: %#+v, got sampler: %#+v", tc.wantSamplerDescription, sampler.Description()) + } + }) + } +} + +func TestTracingConfig(t *testing.T) { + tests := []struct { + name string + sampleRate int + wantErr bool + }{ + { + name: "invalid - sample rate is less than 0", + sampleRate: -1, + wantErr: true, + }, + { + name: "invalid - sample rate is more than allowed value", + sampleRate: maxSamplingRatePerMillion + 1, + wantErr: true, + }, + { + name: "valid - sample rate is 100", + sampleRate: 100, + wantErr: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := validateTracingConfig(tc.sampleRate) + if err == nil && tc.wantErr { + t.Errorf("expected error got (%v) error", err) + } + if err != nil && !tc.wantErr { + t.Errorf("expected no errors, got error: (%v)", err) + } + }) + } +} diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 8e5ac151a..b0872eeb4 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -46,13 +46,6 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/otel/exporters/otlp" - "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - tracesdk "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/semconv" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -229,7 +222,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if srvcfg.ExperimentalEnableDistributedTracing { tctx := context.Background() - tracingExporter, opts, err := e.setupTracing(tctx) + tracingExporter, opts, err := setupTracingExporter(tctx, cfg) if err != nil { return e, err } @@ -238,6 +231,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) } srvcfg.ExperimentalTracerOptions = opts + + e.cfg.logger.Info( + "distributed tracing setup enabled", + ) } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) @@ -809,52 +806,3 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er } return ret, nil } - -func (e *Etcd) setupTracing(ctx context.Context) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) { - exporter, err = otlp.NewExporter(ctx, - otlpgrpc.NewDriver( - otlpgrpc.WithEndpoint(e.cfg.ExperimentalDistributedTracingAddress), - otlpgrpc.WithInsecure(), - )) - if err != nil { - return nil, nil, err - } - res := resource.NewWithAttributes( - semconv.ServiceNameKey.String(e.cfg.ExperimentalDistributedTracingServiceName), - ) - // As Tracing service Instance ID must be unique, it should - // never use the empty default string value, so we only set it - // if it's a non empty string. - if e.cfg.ExperimentalDistributedTracingServiceInstanceID != "" { - resWithIDKey := resource.NewWithAttributes( - (semconv.ServiceInstanceIDKey.String(e.cfg.ExperimentalDistributedTracingServiceInstanceID)), - ) - // Merge resources to combine into a new - // resource in case of duplicates. - res = resource.Merge(res, resWithIDKey) - } - - options = append(options, - otelgrpc.WithPropagators( - propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ), - ), - otelgrpc.WithTracerProvider( - tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exporter), - tracesdk.WithResource(res), - ), - ), - ) - - e.cfg.logger.Info( - "distributed tracing enabled", - zap.String("distributed-tracing-address", e.cfg.ExperimentalDistributedTracingAddress), - zap.String("distributed-tracing-service-name", e.cfg.ExperimentalDistributedTracingServiceName), - zap.String("distributed-tracing-service-instance-id", e.cfg.ExperimentalDistributedTracingServiceInstanceID), - ) - - return exporter, options, err -} diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 5dadbf6c8..50eed2868 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -267,6 +267,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).") fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.") fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.") + fs.IntVar(&cfg.ec.ExperimentalDistributedTracingSamplingRatePerMillion, "experimental-distributed-tracing-sampling-rate", 0, "Number of samples to collect per million spans for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).") // auth fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index dc5b55fae..3c4999815 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -210,6 +210,8 @@ Experimental distributed tracing: Distributed tracing service name, must be same across all etcd instances. --experimental-distributed-tracing-instance-id '' Distributed tracing instance ID, must be unique per each etcd instance. + --experimental-distributed-tracing-sampling-rate '0' + Number of samples to collect per million spans for distributed tracing. Disabled by default. v2 Proxy (to be deprecated in v3.6): --proxy 'off' From af10e8791fc062f565bbce780e384aec884790bb Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 11 Aug 2021 17:11:27 +0800 Subject: [PATCH 2/6] fix typo in wal doc, log: - fix logs in wal Repair - unify broken file name to val --- server/storage/wal/doc.go | 12 ++++++------ server/storage/wal/repair.go | 7 ++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/storage/wal/doc.go b/server/storage/wal/doc.go index 7ea348e4a..32fa6162a 100644 --- a/server/storage/wal/doc.go +++ b/server/storage/wal/doc.go @@ -13,11 +13,11 @@ // limitations under the License. /* -Package wal provides an implementation of a write ahead log that is used by +Package wal provides an implementation of write ahead log that is used by etcd. A WAL is created at a particular directory and is made up of a number of -segmented WAL files. Inside of each file the raft state and entries are appended +segmented WAL files. Inside each file the raft state and entries are appended to it with the Save method: metadata := []byte{} @@ -41,18 +41,18 @@ protobuf. The record protobuf contains a CRC, a type, and a data payload. The le record is 8-byte aligned so that the length field is never torn. The CRC contains the CRC32 value of all record protobufs preceding the current record. -WAL files are placed inside of the directory in the following format: +WAL files are placed inside the directory in the following format: $seq-$index.wal The first WAL file to be created will be 0000000000000000-0000000000000000.wal indicating an initial sequence of 0 and an initial raft index of 0. The first entry written to WAL MUST have raft index 0. -WAL will cut its current tail wal file if its size exceeds 64MB. This will increment an internal +WAL will cut its current tail wal file if its size exceeds 64 MB. This will increment an internal sequence number and cause a new file to be created. If the last raft index saved was 0x20 and this is the first time cut has been called on this WAL then the sequence will increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal. -If a second cut issues 0x10 entries with incremental index later then the file will be called: +If a second cut issues 0x10 entries with incremental index later, then the file will be called: 0000000000000002-0000000000000031.wal. At a later time a WAL can be opened at a particular snapshot. If there is no @@ -63,7 +63,7 @@ snapshot, an empty snapshot should be passed in. The snapshot must have been written to the WAL. -Additional items cannot be Saved to this WAL until all of the items from the given +Additional items cannot be Saved to this WAL until all the items from the given snapshot to the end of the WAL are read first: metadata, state, ents, err := w.ReadAll() diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index b6b9b49f2..c007763de 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -64,9 +64,10 @@ func Repair(lg *zap.Logger, dirpath string) bool { return true case io.ErrUnexpectedEOF: - bf, bferr := os.Create(f.Name() + ".broken") + brokenName := f.Name() + ".broken" + bf, bferr := os.Create(brokenName) if bferr != nil { - lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr)) + lg.Warn("failed to create backup file", zap.String("path", brokenName), zap.Error(bferr)) return false } defer bf.Close() @@ -77,7 +78,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { } if _, err = io.Copy(bf, f); err != nil { - lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err)) + lg.Warn("failed to copy", zap.String("from", f.Name()), zap.String("to", brokenName), zap.Error(err)) return false } From ceb23c9d9b07efb10c49a5ed73edc50ad40dd9b0 Mon Sep 17 00:00:00 2001 From: "leoyang.yl" Date: Thu, 2 Sep 2021 21:11:19 +0800 Subject: [PATCH 3/6] rw benchmark add val-size --- tools/rw-heatmaps/rw-benchmark.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/rw-heatmaps/rw-benchmark.sh b/tools/rw-heatmaps/rw-benchmark.sh index f9957ff70..aeec18549 100755 --- a/tools/rw-heatmaps/rw-benchmark.sh +++ b/tools/rw-heatmaps/rw-benchmark.sh @@ -182,6 +182,7 @@ for RATIO_STR in ${RATIO_LIST}; do --total=${RUN_COUNT} \ --endpoints "http://127.0.0.1:${CLIENT_PORT}" \ --rw-ratio ${RATIO} --limit ${RANGE_RESULT_LIMIT} \ + --val-size ${VALUE_SIZE} \ 2>/dev/null | grep "Requests/sec" | awk "{print \$2}") if [ $? -ne 0 ]; then echo "benchmark command failed: $?" From 87f1dc7e40d8448da4db02f079830bf749272ae0 Mon Sep 17 00:00:00 2001 From: Eduardo Patrocinio Date: Fri, 3 Sep 2021 15:34:15 -0400 Subject: [PATCH 4/6] Fix a few typos --- server/etcdserver/server.go | 2 +- tests/integration/clientv3/snapshot/v3_snapshot_test.go | 2 +- tests/integration/snapshot/v3_snapshot_test.go | 2 +- tests/integration/utl_wal_version_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 590c98c8e..dac10ad5a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1874,7 +1874,7 @@ func (s *EtcdServer) apply( return appliedt, appliedi, shouldStop } -// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer +// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly index := s.consistIndex.ConsistentIndex() diff --git a/tests/integration/clientv3/snapshot/v3_snapshot_test.go b/tests/integration/clientv3/snapshot/v3_snapshot_test.go index 9222a37cb..c61188981 100644 --- a/tests/integration/clientv3/snapshot/v3_snapshot_test.go +++ b/tests/integration/clientv3/snapshot/v3_snapshot_test.go @@ -89,7 +89,7 @@ func newEmbedConfig(t *testing.T) *embed.Config { // creates a snapshot file and returns the file path. func createSnapshotFile(t *testing.T, cfg *embed.Config, kvs []kv) (version string, dbPath string) { testutil.SkipTestIfShortMode(t, - "Snapshot creation tests are depending on embedded etcServer so are integration-level tests.") + "Snapshot creation tests are depending on embedded etcd server so are integration-level tests.") srv, err := embed.StartEtcd(cfg) if err != nil { diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index dd1ee5226..1cad1e1e6 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -170,7 +170,7 @@ type kv struct { // creates a snapshot file and returns the file path. func createSnapshotFile(t *testing.T, kvs []kv) string { testutil.SkipTestIfShortMode(t, - "Snapshot creation tests are depending on embedded etcServer so are integration-level tests.") + "Snapshot creation tests are depending on embedded etcd server so are integration-level tests.") clusterN := 1 urls := newEmbedURLs(clusterN * 2) cURLs, pURLs := urls[:clusterN], urls[clusterN:] diff --git a/tests/integration/utl_wal_version_test.go b/tests/integration/utl_wal_version_test.go index 1290b76ae..cbed561bd 100644 --- a/tests/integration/utl_wal_version_test.go +++ b/tests/integration/utl_wal_version_test.go @@ -33,7 +33,7 @@ import ( func TestEtcdVersionFromWAL(t *testing.T) { testutil.SkipTestIfShortMode(t, - "Wal creation tests are depending on embedded etcServer so are integration-level tests.") + "Wal creation tests are depending on embedded etcd server so are integration-level tests.") cfg := NewEmbedConfig(t, "default") srv, err := embed.StartEtcd(cfg) if err != nil { From 2a750a8dbaee89e4627e1038a752aa613ef83594 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Sat, 28 Aug 2021 23:47:14 +0900 Subject: [PATCH 5/6] *: implement a retry logic for auth old revision in the client --- api/v3rpc/rpctypes/error.go | 3 + client/v3/retry_interceptor.go | 4 +- client/v3/retry_interceptor_test.go | 124 ++++++++++++++++++++++++++++ server/etcdserver/api/v3rpc/util.go | 1 + 4 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 client/v3/retry_interceptor_test.go diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index 72a58b9ee..e3ffae96a 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -65,6 +65,7 @@ var ( ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err() ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err() ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err() + ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err() ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() @@ -131,6 +132,7 @@ var ( ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled, ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken, ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt, + ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision, ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, @@ -195,6 +197,7 @@ var ( ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted) ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled) ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) + ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) ErrNoLeader = Error(ErrGRPCNoLeader) diff --git a/client/v3/retry_interceptor.go b/client/v3/retry_interceptor.go index 218381d93..2b5d26c8f 100644 --- a/client/v3/retry_interceptor.go +++ b/client/v3/retry_interceptor.go @@ -157,7 +157,9 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool { // which is possible when the client token is cleared somehow return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != "" } - return callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken + + return callOpts.retryAuth && + (rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision) } // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a diff --git a/client/v3/retry_interceptor_test.go b/client/v3/retry_interceptor_test.go new file mode 100644 index 000000000..b850b56e0 --- /dev/null +++ b/client/v3/retry_interceptor_test.go @@ -0,0 +1,124 @@ +package clientv3 + +import ( + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/v3/credentials" + grpccredentials "google.golang.org/grpc/credentials" + "testing" +) + +type dummyAuthTokenBundle struct{} + +func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials { + return nil +} + +func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials { + return nil +} + +func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) { + return nil, nil +} + +func (d dummyAuthTokenBundle) UpdateAuthToken(token string) { +} + +func TestClientShouldRefreshToken(t *testing.T) { + type fields struct { + authTokenBundle credentials.Bundle + } + type args struct { + err error + callOpts *options + } + + optsWithTrue := &options{ + retryAuth: true, + } + optsWithFalse := &options{ + retryAuth: false, + } + + tests := []struct { + name string + fields fields + args args + want bool + }{ + { + name: "ErrUserEmpty and non nil authTokenBundle", + fields: fields{ + authTokenBundle: &dummyAuthTokenBundle{}, + }, + args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue}, + want: true, + }, + { + name: "ErrUserEmpty and nil authTokenBundle", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue}, + want: false, + }, + { + name: "ErrGRPCInvalidAuthToken and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue}, + want: true, + }, + { + name: "ErrGRPCInvalidAuthToken and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse}, + want: false, + }, + { + name: "ErrGRPCAuthOldRevision and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue}, + want: true, + }, + { + name: "ErrGRPCAuthOldRevision and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse}, + want: false, + }, + { + name: "Other error and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue}, + want: false, + }, + { + name: "Other error and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Client{ + authTokenBundle: tt.fields.authTokenBundle, + } + if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want { + t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index 373d430e2..f054f29c8 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{ auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled, auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken, auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt, + auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision, // In sync with status.FromContextError context.Canceled: rpctypes.ErrGRPCCanceled, From ad69fe0f4016fa23bcc8d2d40d1748e550aaab25 Mon Sep 17 00:00:00 2001 From: "leoyang.yl" Date: Thu, 9 Sep 2021 19:32:08 +0800 Subject: [PATCH 6/6] fix IsOptsWithFromKey fix IsOptsWithFromKey fix IsOptsWithFromKey fix IsOptsWithFromKey fix IsOptsWithFromKey fix IsOptsWithFromKey --- client/v3/op.go | 27 +++++++++++++++++++++++++-- client/v3/utils.go | 18 ------------------ tests/integration/testing_test.go | 8 ++++---- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/client/v3/op.go b/client/v3/op.go index bd0f1f2f2..e8c0c1e08 100644 --- a/client/v3/op.go +++ b/client/v3/op.go @@ -77,6 +77,9 @@ type Op struct { cmps []Cmp thenOps []Op elseOps []Op + + isOptsWithFromKey bool + isOptsWithPrefix bool } // accessors / mutators @@ -216,6 +219,10 @@ func (op Op) isWrite() bool { return op.t != tRange } +func NewOp() *Op { + return &Op{key: []byte("")} +} + // OpGet returns "get" operation based on given key and operation options. func OpGet(key string, opts ...OpOption) Op { // WithPrefix and WithFromKey are not supported together @@ -387,6 +394,7 @@ func WithPrefix() OpOption { return } op.end = getPrefix(op.key) + op.isOptsWithPrefix = true } } @@ -406,6 +414,7 @@ func WithFromKey() OpOption { op.key = []byte{0} } op.end = []byte("\x00") + op.isOptsWithFromKey = true } } @@ -554,7 +563,21 @@ func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLi } // IsOptsWithPrefix returns true if WithPrefix option is called in the given opts. -func IsOptsWithPrefix(opts []OpOption) bool { return isOpFuncCalled("WithPrefix", opts) } +func IsOptsWithPrefix(opts []OpOption) bool { + ret := NewOp() + for _, opt := range opts { + opt(ret) + } + + return ret.isOptsWithPrefix +} // IsOptsWithFromKey returns true if WithFromKey option is called in the given opts. -func IsOptsWithFromKey(opts []OpOption) bool { return isOpFuncCalled("WithFromKey", opts) } +func IsOptsWithFromKey(opts []OpOption) bool { + ret := NewOp() + for _, opt := range opts { + opt(ret) + } + + return ret.isOptsWithFromKey +} diff --git a/client/v3/utils.go b/client/v3/utils.go index b998c41b9..850275877 100644 --- a/client/v3/utils.go +++ b/client/v3/utils.go @@ -16,9 +16,6 @@ package clientv3 import ( "math/rand" - "reflect" - "runtime" - "strings" "time" ) @@ -32,18 +29,3 @@ func jitterUp(duration time.Duration, jitter float64) time.Duration { multiplier := jitter * (rand.Float64()*2 - 1) return time.Duration(float64(duration) * (1 + multiplier)) } - -// Check if the provided function is being called in the op options. -func isOpFuncCalled(op string, opts []OpOption) bool { - for _, opt := range opts { - v := reflect.ValueOf(opt) - if v.Kind() == reflect.Func { - if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil { - if strings.Contains(opFunc.Name(), op) { - return true - } - } - } - } - return false -} diff --git a/tests/integration/testing_test.go b/tests/integration/testing_test.go index f9afb8ded..a225063b1 100644 --- a/tests/integration/testing_test.go +++ b/tests/integration/testing_test.go @@ -15,14 +15,14 @@ package integration_test import ( - "testing" - "time" - "go.etcd.io/etcd/tests/v3/integration" + "testing" ) func TestBeforeTestWithoutLeakDetection(t *testing.T) { integration.BeforeTest(t, integration.WithoutGoLeakDetection(), integration.WithoutSkipInShort()) // Intentional leak that should get ignored - go time.Sleep(2 * time.Second) + go func() { + + }() }