mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge branch 'main' into etcd-client-solaris-build
This commit is contained in:
@@ -65,6 +65,7 @@ var (
|
||||
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
|
||||
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
|
||||
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
|
||||
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()
|
||||
|
||||
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
|
||||
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
|
||||
@@ -131,6 +132,7 @@ var (
|
||||
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
|
||||
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
|
||||
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
|
||||
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,
|
||||
|
||||
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
||||
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
|
||||
@@ -195,6 +197,7 @@ var (
|
||||
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
|
||||
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
|
||||
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
|
||||
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
|
||||
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
|
||||
|
||||
ErrNoLeader = Error(ErrGRPCNoLeader)
|
||||
|
||||
@@ -77,6 +77,9 @@ type Op struct {
|
||||
cmps []Cmp
|
||||
thenOps []Op
|
||||
elseOps []Op
|
||||
|
||||
isOptsWithFromKey bool
|
||||
isOptsWithPrefix bool
|
||||
}
|
||||
|
||||
// accessors / mutators
|
||||
@@ -216,6 +219,10 @@ func (op Op) isWrite() bool {
|
||||
return op.t != tRange
|
||||
}
|
||||
|
||||
func NewOp() *Op {
|
||||
return &Op{key: []byte("")}
|
||||
}
|
||||
|
||||
// OpGet returns "get" operation based on given key and operation options.
|
||||
func OpGet(key string, opts ...OpOption) Op {
|
||||
// WithPrefix and WithFromKey are not supported together
|
||||
@@ -387,6 +394,7 @@ func WithPrefix() OpOption {
|
||||
return
|
||||
}
|
||||
op.end = getPrefix(op.key)
|
||||
op.isOptsWithPrefix = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,6 +414,7 @@ func WithFromKey() OpOption {
|
||||
op.key = []byte{0}
|
||||
}
|
||||
op.end = []byte("\x00")
|
||||
op.isOptsWithFromKey = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -554,7 +563,21 @@ func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLi
|
||||
}
|
||||
|
||||
// IsOptsWithPrefix returns true if WithPrefix option is called in the given opts.
|
||||
func IsOptsWithPrefix(opts []OpOption) bool { return isOpFuncCalled("WithPrefix", opts) }
|
||||
func IsOptsWithPrefix(opts []OpOption) bool {
|
||||
ret := NewOp()
|
||||
for _, opt := range opts {
|
||||
opt(ret)
|
||||
}
|
||||
|
||||
return ret.isOptsWithPrefix
|
||||
}
|
||||
|
||||
// IsOptsWithFromKey returns true if WithFromKey option is called in the given opts.
|
||||
func IsOptsWithFromKey(opts []OpOption) bool { return isOpFuncCalled("WithFromKey", opts) }
|
||||
func IsOptsWithFromKey(opts []OpOption) bool {
|
||||
ret := NewOp()
|
||||
for _, opt := range opts {
|
||||
opt(ret)
|
||||
}
|
||||
|
||||
return ret.isOptsWithFromKey
|
||||
}
|
||||
|
||||
@@ -157,7 +157,9 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
|
||||
// which is possible when the client token is cleared somehow
|
||||
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
|
||||
}
|
||||
return callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken
|
||||
|
||||
return callOpts.retryAuth &&
|
||||
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
|
||||
}
|
||||
|
||||
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
|
||||
|
||||
124
client/v3/retry_interceptor_test.go
Normal file
124
client/v3/retry_interceptor_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/v3/credentials"
|
||||
grpccredentials "google.golang.org/grpc/credentials"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type dummyAuthTokenBundle struct{}
|
||||
|
||||
func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
|
||||
}
|
||||
|
||||
func TestClientShouldRefreshToken(t *testing.T) {
|
||||
type fields struct {
|
||||
authTokenBundle credentials.Bundle
|
||||
}
|
||||
type args struct {
|
||||
err error
|
||||
callOpts *options
|
||||
}
|
||||
|
||||
optsWithTrue := &options{
|
||||
retryAuth: true,
|
||||
}
|
||||
optsWithFalse := &options{
|
||||
retryAuth: false,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "ErrUserEmpty and non nil authTokenBundle",
|
||||
fields: fields{
|
||||
authTokenBundle: &dummyAuthTokenBundle{},
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "ErrUserEmpty and nil authTokenBundle",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "ErrGRPCInvalidAuthToken and retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "ErrGRPCInvalidAuthToken and !retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "ErrGRPCAuthOldRevision and retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "ErrGRPCAuthOldRevision and !retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "Other error and retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "Other error and !retryAuth",
|
||||
fields: fields{
|
||||
authTokenBundle: nil,
|
||||
},
|
||||
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{
|
||||
authTokenBundle: tt.fields.authTokenBundle,
|
||||
}
|
||||
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
|
||||
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -16,9 +16,6 @@ package clientv3
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -32,18 +29,3 @@ func jitterUp(duration time.Duration, jitter float64) time.Duration {
|
||||
multiplier := jitter * (rand.Float64()*2 - 1)
|
||||
return time.Duration(float64(duration) * (1 + multiplier))
|
||||
}
|
||||
|
||||
// Check if the provided function is being called in the op options.
|
||||
func isOpFuncCalled(op string, opts []OpOption) bool {
|
||||
for _, opt := range opts {
|
||||
v := reflect.ValueOf(opt)
|
||||
if v.Kind() == reflect.Func {
|
||||
if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil {
|
||||
if strings.Contains(opFunc.Name(), op) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -352,6 +352,9 @@ type Config struct {
|
||||
// that exist at the same time.
|
||||
// Can only be used if ExperimentalEnableDistributedTracing is true.
|
||||
ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"`
|
||||
// ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans.
|
||||
// Defaults to 0.
|
||||
ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"`
|
||||
|
||||
// Logger is logger options: currently only supports "zap".
|
||||
// "capnslog" is removed in v3.5.
|
||||
@@ -689,6 +692,13 @@ func (cfg *Config) Validate() error {
|
||||
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
||||
}
|
||||
|
||||
// Validate distributed tracing configuration but only if enabled.
|
||||
if cfg.ExperimentalEnableDistributedTracing {
|
||||
if err := validateTracingConfig(cfg.ExperimentalDistributedTracingSamplingRatePerMillion); err != nil {
|
||||
return fmt.Errorf("distributed tracing configurition is not valid: (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
111
server/embed/config_tracing.go
Normal file
111
server/embed/config_tracing.go
Normal file
@@ -0,0 +1,111 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/semconv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const maxSamplingRatePerMillion = 1000000
|
||||
|
||||
func validateTracingConfig(samplingRate int) error {
|
||||
if samplingRate < 0 {
|
||||
return fmt.Errorf("tracing sampling rate must be positive")
|
||||
}
|
||||
if samplingRate > maxSamplingRatePerMillion {
|
||||
return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
||||
exporter, err = otlp.NewExporter(ctx,
|
||||
otlpgrpc.NewDriver(
|
||||
otlpgrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
|
||||
otlpgrpc.WithInsecure(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
res := resource.NewWithAttributes(
|
||||
semconv.ServiceNameKey.String(cfg.ExperimentalDistributedTracingServiceName),
|
||||
)
|
||||
|
||||
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
|
||||
// Merge resources into a new
|
||||
// resource in case of duplicates.
|
||||
res = resource.Merge(res, resWithIDKey)
|
||||
}
|
||||
|
||||
options = append(options,
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
),
|
||||
),
|
||||
otelgrpc.WithTracerProvider(
|
||||
tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
tracesdk.WithSampler(
|
||||
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
cfg.logger.Debug(
|
||||
"distributed tracing enabled",
|
||||
zap.String("address", cfg.ExperimentalDistributedTracingAddress),
|
||||
zap.String("service-name", cfg.ExperimentalDistributedTracingServiceName),
|
||||
zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID),
|
||||
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
|
||||
)
|
||||
|
||||
return exporter, options, err
|
||||
}
|
||||
|
||||
func determineSampler(samplingRate int) tracesdk.Sampler {
|
||||
sampler := tracesdk.NeverSample()
|
||||
if samplingRate == 0 {
|
||||
return sampler
|
||||
}
|
||||
return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion))
|
||||
}
|
||||
|
||||
// As Tracing service Instance ID must be unique, it should
|
||||
// never use the empty default string value, it's set if
|
||||
// if it's a non empty string.
|
||||
func determineResourceWithIDKey(serviceInstanceID string) *resource.Resource {
|
||||
if serviceInstanceID != "" {
|
||||
return resource.NewWithAttributes(
|
||||
(semconv.ServiceInstanceIDKey.String(serviceInstanceID)),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
83
server/embed/config_tracing_test.go
Normal file
83
server/embed/config_tracing_test.go
Normal file
@@ -0,0 +1,83 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const neverSampleDescription = "AlwaysOffSampler"
|
||||
|
||||
func TestDetermineSampler(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sampleRate int
|
||||
wantSamplerDescription string
|
||||
}{
|
||||
{
|
||||
name: "sample rate is disabled",
|
||||
sampleRate: 0,
|
||||
wantSamplerDescription: neverSampleDescription,
|
||||
},
|
||||
{
|
||||
name: "sample rate is 100",
|
||||
sampleRate: 100,
|
||||
wantSamplerDescription: "TraceIDRatioBased{0.0001}",
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
sampler := determineSampler(tc.sampleRate)
|
||||
if tc.wantSamplerDescription != sampler.Description() {
|
||||
t.Errorf("tracing sampler was not as expected; expected sampler: %#+v, got sampler: %#+v", tc.wantSamplerDescription, sampler.Description())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracingConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sampleRate int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "invalid - sample rate is less than 0",
|
||||
sampleRate: -1,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - sample rate is more than allowed value",
|
||||
sampleRate: maxSamplingRatePerMillion + 1,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "valid - sample rate is 100",
|
||||
sampleRate: 100,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := validateTracingConfig(tc.sampleRate)
|
||||
if err == nil && tc.wantErr {
|
||||
t.Errorf("expected error got (%v) error", err)
|
||||
}
|
||||
if err != nil && !tc.wantErr {
|
||||
t.Errorf("expected no errors, got error: (%v)", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -47,13 +47,6 @@ import (
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/semconv"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -231,7 +224,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
|
||||
if srvcfg.ExperimentalEnableDistributedTracing {
|
||||
tctx := context.Background()
|
||||
tracingExporter, opts, err := e.setupTracing(tctx)
|
||||
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
|
||||
if err != nil {
|
||||
return e, err
|
||||
}
|
||||
@@ -240,6 +233,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
}
|
||||
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
|
||||
srvcfg.ExperimentalTracerOptions = opts
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"distributed tracing setup enabled",
|
||||
)
|
||||
}
|
||||
|
||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||
@@ -811,52 +808,3 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (e *Etcd) setupTracing(ctx context.Context) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
||||
exporter, err = otlp.NewExporter(ctx,
|
||||
otlpgrpc.NewDriver(
|
||||
otlpgrpc.WithEndpoint(e.cfg.ExperimentalDistributedTracingAddress),
|
||||
otlpgrpc.WithInsecure(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
res := resource.NewWithAttributes(
|
||||
semconv.ServiceNameKey.String(e.cfg.ExperimentalDistributedTracingServiceName),
|
||||
)
|
||||
// As Tracing service Instance ID must be unique, it should
|
||||
// never use the empty default string value, so we only set it
|
||||
// if it's a non empty string.
|
||||
if e.cfg.ExperimentalDistributedTracingServiceInstanceID != "" {
|
||||
resWithIDKey := resource.NewWithAttributes(
|
||||
(semconv.ServiceInstanceIDKey.String(e.cfg.ExperimentalDistributedTracingServiceInstanceID)),
|
||||
)
|
||||
// Merge resources to combine into a new
|
||||
// resource in case of duplicates.
|
||||
res = resource.Merge(res, resWithIDKey)
|
||||
}
|
||||
|
||||
options = append(options,
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
),
|
||||
),
|
||||
otelgrpc.WithTracerProvider(
|
||||
tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"distributed tracing enabled",
|
||||
zap.String("distributed-tracing-address", e.cfg.ExperimentalDistributedTracingAddress),
|
||||
zap.String("distributed-tracing-service-name", e.cfg.ExperimentalDistributedTracingServiceName),
|
||||
zap.String("distributed-tracing-service-instance-id", e.cfg.ExperimentalDistributedTracingServiceInstanceID),
|
||||
)
|
||||
|
||||
return exporter, options, err
|
||||
}
|
||||
|
||||
@@ -267,6 +267,7 @@ func newConfig() *config {
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.")
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalDistributedTracingSamplingRatePerMillion, "experimental-distributed-tracing-sampling-rate", 0, "Number of samples to collect per million spans for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||
|
||||
// auth
|
||||
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
|
||||
|
||||
@@ -210,6 +210,8 @@ Experimental distributed tracing:
|
||||
Distributed tracing service name, must be same across all etcd instances.
|
||||
--experimental-distributed-tracing-instance-id ''
|
||||
Distributed tracing instance ID, must be unique per each etcd instance.
|
||||
--experimental-distributed-tracing-sampling-rate '0'
|
||||
Number of samples to collect per million spans for distributed tracing. Disabled by default.
|
||||
|
||||
v2 Proxy (to be deprecated in v3.6):
|
||||
--proxy 'off'
|
||||
|
||||
@@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{
|
||||
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
|
||||
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
|
||||
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
|
||||
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
|
||||
|
||||
// In sync with status.FromContextError
|
||||
context.Canceled: rpctypes.ErrGRPCCanceled,
|
||||
|
||||
@@ -1874,7 +1874,7 @@ func (s *EtcdServer) apply(
|
||||
return appliedt, appliedi, shouldStop
|
||||
}
|
||||
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
|
||||
@@ -13,11 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package wal provides an implementation of a write ahead log that is used by
|
||||
Package wal provides an implementation of write ahead log that is used by
|
||||
etcd.
|
||||
|
||||
A WAL is created at a particular directory and is made up of a number of
|
||||
segmented WAL files. Inside of each file the raft state and entries are appended
|
||||
segmented WAL files. Inside each file the raft state and entries are appended
|
||||
to it with the Save method:
|
||||
|
||||
metadata := []byte{}
|
||||
@@ -41,18 +41,18 @@ protobuf. The record protobuf contains a CRC, a type, and a data payload. The le
|
||||
record is 8-byte aligned so that the length field is never torn. The CRC contains the CRC32
|
||||
value of all record protobufs preceding the current record.
|
||||
|
||||
WAL files are placed inside of the directory in the following format:
|
||||
WAL files are placed inside the directory in the following format:
|
||||
$seq-$index.wal
|
||||
|
||||
The first WAL file to be created will be 0000000000000000-0000000000000000.wal
|
||||
indicating an initial sequence of 0 and an initial raft index of 0. The first
|
||||
entry written to WAL MUST have raft index 0.
|
||||
|
||||
WAL will cut its current tail wal file if its size exceeds 64MB. This will increment an internal
|
||||
WAL will cut its current tail wal file if its size exceeds 64 MB. This will increment an internal
|
||||
sequence number and cause a new file to be created. If the last raft index saved
|
||||
was 0x20 and this is the first time cut has been called on this WAL then the sequence will
|
||||
increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.
|
||||
If a second cut issues 0x10 entries with incremental index later then the file will be called:
|
||||
If a second cut issues 0x10 entries with incremental index later, then the file will be called:
|
||||
0000000000000002-0000000000000031.wal.
|
||||
|
||||
At a later time a WAL can be opened at a particular snapshot. If there is no
|
||||
@@ -63,7 +63,7 @@ snapshot, an empty snapshot should be passed in.
|
||||
|
||||
The snapshot must have been written to the WAL.
|
||||
|
||||
Additional items cannot be Saved to this WAL until all of the items from the given
|
||||
Additional items cannot be Saved to this WAL until all the items from the given
|
||||
snapshot to the end of the WAL are read first:
|
||||
|
||||
metadata, state, ents, err := w.ReadAll()
|
||||
|
||||
@@ -64,9 +64,10 @@ func Repair(lg *zap.Logger, dirpath string) bool {
|
||||
return true
|
||||
|
||||
case io.ErrUnexpectedEOF:
|
||||
bf, bferr := os.Create(f.Name() + ".broken")
|
||||
brokenName := f.Name() + ".broken"
|
||||
bf, bferr := os.Create(brokenName)
|
||||
if bferr != nil {
|
||||
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
|
||||
lg.Warn("failed to create backup file", zap.String("path", brokenName), zap.Error(bferr))
|
||||
return false
|
||||
}
|
||||
defer bf.Close()
|
||||
@@ -77,7 +78,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
|
||||
}
|
||||
|
||||
if _, err = io.Copy(bf, f); err != nil {
|
||||
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
|
||||
lg.Warn("failed to copy", zap.String("from", f.Name()), zap.String("to", brokenName), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ func newEmbedConfig(t *testing.T) *embed.Config {
|
||||
// creates a snapshot file and returns the file path.
|
||||
func createSnapshotFile(t *testing.T, cfg *embed.Config, kvs []kv) (version string, dbPath string) {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||
"Snapshot creation tests are depending on embedded etcd server so are integration-level tests.")
|
||||
|
||||
srv, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
|
||||
@@ -170,7 +170,7 @@ type kv struct {
|
||||
// creates a snapshot file and returns the file path.
|
||||
func createSnapshotFile(t *testing.T, kvs []kv) string {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Snapshot creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||
"Snapshot creation tests are depending on embedded etcd server so are integration-level tests.")
|
||||
clusterN := 1
|
||||
urls := newEmbedURLs(clusterN * 2)
|
||||
cURLs, pURLs := urls[:clusterN], urls[clusterN:]
|
||||
|
||||
@@ -15,14 +15,14 @@
|
||||
package integration_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBeforeTestWithoutLeakDetection(t *testing.T) {
|
||||
integration.BeforeTest(t, integration.WithoutGoLeakDetection(), integration.WithoutSkipInShort())
|
||||
// Intentional leak that should get ignored
|
||||
go time.Sleep(2 * time.Second)
|
||||
go func() {
|
||||
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ import (
|
||||
|
||||
func TestEtcdVersionFromWAL(t *testing.T) {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Wal creation tests are depending on embedded etcServer so are integration-level tests.")
|
||||
"Wal creation tests are depending on embedded etcd server so are integration-level tests.")
|
||||
cfg := NewEmbedConfig(t, "default")
|
||||
srv, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
|
||||
@@ -182,6 +182,7 @@ for RATIO_STR in ${RATIO_LIST}; do
|
||||
--total=${RUN_COUNT} \
|
||||
--endpoints "http://127.0.0.1:${CLIENT_PORT}" \
|
||||
--rw-ratio ${RATIO} --limit ${RANGE_RESULT_LIMIT} \
|
||||
--val-size ${VALUE_SIZE} \
|
||||
2>/dev/null | grep "Requests/sec" | awk "{print \$2}")
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "benchmark command failed: $?"
|
||||
|
||||
Reference in New Issue
Block a user