mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13248 from lilic/add-sampling-rate
server: Add sampling rate to distributed tracing
This commit is contained in:
commit
a4a82cc982
@ -352,6 +352,9 @@ type Config struct {
|
||||
// that exist at the same time.
|
||||
// Can only be used if ExperimentalEnableDistributedTracing is true.
|
||||
ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"`
|
||||
// ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans.
|
||||
// Defaults to 0.
|
||||
ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"`
|
||||
|
||||
// Logger is logger options: currently only supports "zap".
|
||||
// "capnslog" is removed in v3.5.
|
||||
@ -689,6 +692,13 @@ func (cfg *Config) Validate() error {
|
||||
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
||||
}
|
||||
|
||||
// Validate distributed tracing configuration but only if enabled.
|
||||
if cfg.ExperimentalEnableDistributedTracing {
|
||||
if err := validateTracingConfig(cfg.ExperimentalDistributedTracingSamplingRatePerMillion); err != nil {
|
||||
return fmt.Errorf("distributed tracing configurition is not valid: (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
111
server/embed/config_tracing.go
Normal file
111
server/embed/config_tracing.go
Normal file
@ -0,0 +1,111 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/semconv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const maxSamplingRatePerMillion = 1000000
|
||||
|
||||
func validateTracingConfig(samplingRate int) error {
|
||||
if samplingRate < 0 {
|
||||
return fmt.Errorf("tracing sampling rate must be positive")
|
||||
}
|
||||
if samplingRate > maxSamplingRatePerMillion {
|
||||
return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
||||
exporter, err = otlp.NewExporter(ctx,
|
||||
otlpgrpc.NewDriver(
|
||||
otlpgrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
|
||||
otlpgrpc.WithInsecure(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
res := resource.NewWithAttributes(
|
||||
semconv.ServiceNameKey.String(cfg.ExperimentalDistributedTracingServiceName),
|
||||
)
|
||||
|
||||
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
|
||||
// Merge resources into a new
|
||||
// resource in case of duplicates.
|
||||
res = resource.Merge(res, resWithIDKey)
|
||||
}
|
||||
|
||||
options = append(options,
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
),
|
||||
),
|
||||
otelgrpc.WithTracerProvider(
|
||||
tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
tracesdk.WithSampler(
|
||||
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
cfg.logger.Debug(
|
||||
"distributed tracing enabled",
|
||||
zap.String("address", cfg.ExperimentalDistributedTracingAddress),
|
||||
zap.String("service-name", cfg.ExperimentalDistributedTracingServiceName),
|
||||
zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID),
|
||||
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
|
||||
)
|
||||
|
||||
return exporter, options, err
|
||||
}
|
||||
|
||||
func determineSampler(samplingRate int) tracesdk.Sampler {
|
||||
sampler := tracesdk.NeverSample()
|
||||
if samplingRate == 0 {
|
||||
return sampler
|
||||
}
|
||||
return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion))
|
||||
}
|
||||
|
||||
// As Tracing service Instance ID must be unique, it should
|
||||
// never use the empty default string value, it's set if
|
||||
// if it's a non empty string.
|
||||
func determineResourceWithIDKey(serviceInstanceID string) *resource.Resource {
|
||||
if serviceInstanceID != "" {
|
||||
return resource.NewWithAttributes(
|
||||
(semconv.ServiceInstanceIDKey.String(serviceInstanceID)),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
83
server/embed/config_tracing_test.go
Normal file
83
server/embed/config_tracing_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const neverSampleDescription = "AlwaysOffSampler"
|
||||
|
||||
func TestDetermineSampler(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sampleRate int
|
||||
wantSamplerDescription string
|
||||
}{
|
||||
{
|
||||
name: "sample rate is disabled",
|
||||
sampleRate: 0,
|
||||
wantSamplerDescription: neverSampleDescription,
|
||||
},
|
||||
{
|
||||
name: "sample rate is 100",
|
||||
sampleRate: 100,
|
||||
wantSamplerDescription: "TraceIDRatioBased{0.0001}",
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
sampler := determineSampler(tc.sampleRate)
|
||||
if tc.wantSamplerDescription != sampler.Description() {
|
||||
t.Errorf("tracing sampler was not as expected; expected sampler: %#+v, got sampler: %#+v", tc.wantSamplerDescription, sampler.Description())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracingConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sampleRate int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "invalid - sample rate is less than 0",
|
||||
sampleRate: -1,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - sample rate is more than allowed value",
|
||||
sampleRate: maxSamplingRatePerMillion + 1,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "valid - sample rate is 100",
|
||||
sampleRate: 100,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := validateTracingConfig(tc.sampleRate)
|
||||
if err == nil && tc.wantErr {
|
||||
t.Errorf("expected error got (%v) error", err)
|
||||
}
|
||||
if err != nil && !tc.wantErr {
|
||||
t.Errorf("expected no errors, got error: (%v)", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -47,13 +47,6 @@ import (
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/semconv"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -231,7 +224,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
|
||||
if srvcfg.ExperimentalEnableDistributedTracing {
|
||||
tctx := context.Background()
|
||||
tracingExporter, opts, err := e.setupTracing(tctx)
|
||||
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
|
||||
if err != nil {
|
||||
return e, err
|
||||
}
|
||||
@ -240,6 +233,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
}
|
||||
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
|
||||
srvcfg.ExperimentalTracerOptions = opts
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"distributed tracing setup enabled",
|
||||
)
|
||||
}
|
||||
|
||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||
@ -811,52 +808,3 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (e *Etcd) setupTracing(ctx context.Context) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
||||
exporter, err = otlp.NewExporter(ctx,
|
||||
otlpgrpc.NewDriver(
|
||||
otlpgrpc.WithEndpoint(e.cfg.ExperimentalDistributedTracingAddress),
|
||||
otlpgrpc.WithInsecure(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
res := resource.NewWithAttributes(
|
||||
semconv.ServiceNameKey.String(e.cfg.ExperimentalDistributedTracingServiceName),
|
||||
)
|
||||
// As Tracing service Instance ID must be unique, it should
|
||||
// never use the empty default string value, so we only set it
|
||||
// if it's a non empty string.
|
||||
if e.cfg.ExperimentalDistributedTracingServiceInstanceID != "" {
|
||||
resWithIDKey := resource.NewWithAttributes(
|
||||
(semconv.ServiceInstanceIDKey.String(e.cfg.ExperimentalDistributedTracingServiceInstanceID)),
|
||||
)
|
||||
// Merge resources to combine into a new
|
||||
// resource in case of duplicates.
|
||||
res = resource.Merge(res, resWithIDKey)
|
||||
}
|
||||
|
||||
options = append(options,
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
),
|
||||
),
|
||||
otelgrpc.WithTracerProvider(
|
||||
tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"distributed tracing enabled",
|
||||
zap.String("distributed-tracing-address", e.cfg.ExperimentalDistributedTracingAddress),
|
||||
zap.String("distributed-tracing-service-name", e.cfg.ExperimentalDistributedTracingServiceName),
|
||||
zap.String("distributed-tracing-service-instance-id", e.cfg.ExperimentalDistributedTracingServiceInstanceID),
|
||||
)
|
||||
|
||||
return exporter, options, err
|
||||
}
|
||||
|
@ -267,6 +267,7 @@ func newConfig() *config {
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.")
|
||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalDistributedTracingSamplingRatePerMillion, "experimental-distributed-tracing-sampling-rate", 0, "Number of samples to collect per million spans for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||
|
||||
// auth
|
||||
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
|
||||
|
@ -210,6 +210,8 @@ Experimental distributed tracing:
|
||||
Distributed tracing service name, must be same across all etcd instances.
|
||||
--experimental-distributed-tracing-instance-id ''
|
||||
Distributed tracing instance ID, must be unique per each etcd instance.
|
||||
--experimental-distributed-tracing-sampling-rate '0'
|
||||
Number of samples to collect per million spans for distributed tracing. Disabled by default.
|
||||
|
||||
v2 Proxy (to be deprecated in v3.6):
|
||||
--proxy 'off'
|
||||
|
Loading…
x
Reference in New Issue
Block a user