mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16951 from jmhbnz/fix-distributed-tracing
[3.5] Backport add sampling rate to fix distributed tracing
This commit is contained in:
commit
0e64a6d40e
@ -365,6 +365,9 @@ type Config struct {
|
|||||||
// that exist at the same time.
|
// that exist at the same time.
|
||||||
// Can only be used if ExperimentalEnableDistributedTracing is true.
|
// Can only be used if ExperimentalEnableDistributedTracing is true.
|
||||||
ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"`
|
ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"`
|
||||||
|
// ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans.
|
||||||
|
// Defaults to 0.
|
||||||
|
ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"`
|
||||||
|
|
||||||
// Logger is logger options: currently only supports "zap".
|
// Logger is logger options: currently only supports "zap".
|
||||||
// "capnslog" is removed in v3.5.
|
// "capnslog" is removed in v3.5.
|
||||||
@ -758,6 +761,13 @@ func (cfg *Config) Validate() error {
|
|||||||
return fmt.Errorf("cipher suites cannot be configured when only TLS1.3 is enabled")
|
return fmt.Errorf("cipher suites cannot be configured when only TLS1.3 is enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate distributed tracing configuration but only if enabled.
|
||||||
|
if cfg.ExperimentalEnableDistributedTracing {
|
||||||
|
if err := validateTracingConfig(cfg.ExperimentalDistributedTracingSamplingRatePerMillion); err != nil {
|
||||||
|
return fmt.Errorf("distributed tracing configurition is not valid: (%v)", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ package embed
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||||
@ -26,13 +27,32 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
const maxSamplingRatePerMillion = 1000000
|
||||||
exporter, err = otlptracegrpc.New(ctx,
|
|
||||||
|
func validateTracingConfig(samplingRate int) error {
|
||||||
|
if samplingRate < 0 {
|
||||||
|
return fmt.Errorf("tracing sampling rate must be positive")
|
||||||
|
}
|
||||||
|
if samplingRate > maxSamplingRatePerMillion {
|
||||||
|
return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type tracingExporter struct {
|
||||||
|
exporter tracesdk.SpanExporter
|
||||||
|
opts []otelgrpc.Option
|
||||||
|
provider *tracesdk.TracerProvider
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
|
||||||
|
exporter, err := otlptracegrpc.New(ctx,
|
||||||
otlptracegrpc.WithInsecure(),
|
otlptracegrpc.WithInsecure(),
|
||||||
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
|
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := resource.New(ctx,
|
res, err := resource.New(ctx,
|
||||||
@ -41,7 +61,7 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
|
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
|
||||||
@ -49,11 +69,19 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
|||||||
// resource in case of duplicates.
|
// resource in case of duplicates.
|
||||||
res, err = resource.Merge(res, resWithIDKey)
|
res, err = resource.Merge(res, resWithIDKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
options = append(options,
|
traceProvider := tracesdk.NewTracerProvider(
|
||||||
|
tracesdk.WithBatcher(exporter),
|
||||||
|
tracesdk.WithResource(res),
|
||||||
|
tracesdk.WithSampler(
|
||||||
|
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
options := []otelgrpc.Option{
|
||||||
otelgrpc.WithPropagators(
|
otelgrpc.WithPropagators(
|
||||||
propagation.NewCompositeTextMapPropagator(
|
propagation.NewCompositeTextMapPropagator(
|
||||||
propagation.TraceContext{},
|
propagation.TraceContext{},
|
||||||
@ -61,13 +89,9 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
otelgrpc.WithTracerProvider(
|
otelgrpc.WithTracerProvider(
|
||||||
tracesdk.NewTracerProvider(
|
traceProvider,
|
||||||
tracesdk.WithBatcher(exporter),
|
|
||||||
tracesdk.WithResource(res),
|
|
||||||
tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.NeverSample())),
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
)
|
}
|
||||||
|
|
||||||
cfg.logger.Debug(
|
cfg.logger.Debug(
|
||||||
"distributed tracing enabled",
|
"distributed tracing enabled",
|
||||||
@ -76,7 +100,29 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
|||||||
zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID),
|
zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID),
|
||||||
)
|
)
|
||||||
|
|
||||||
return exporter, options, err
|
return &tracingExporter{
|
||||||
|
exporter: exporter,
|
||||||
|
opts: options,
|
||||||
|
provider: traceProvider,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te *tracingExporter) Close(ctx context.Context) {
|
||||||
|
if te.provider != nil {
|
||||||
|
te.provider.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if te.exporter != nil {
|
||||||
|
te.exporter.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func determineSampler(samplingRate int) tracesdk.Sampler {
|
||||||
|
sampler := tracesdk.NeverSample()
|
||||||
|
if samplingRate == 0 {
|
||||||
|
return sampler
|
||||||
|
}
|
||||||
|
return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion))
|
||||||
}
|
}
|
||||||
|
|
||||||
// As Tracing service Instance ID must be unique, it should
|
// As Tracing service Instance ID must be unique, it should
|
||||||
|
83
server/embed/config_tracing_test.go
Normal file
83
server/embed/config_tracing_test.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package embed
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
const neverSampleDescription = "AlwaysOffSampler"
|
||||||
|
|
||||||
|
func TestDetermineSampler(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
sampleRate int
|
||||||
|
wantSamplerDescription string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "sample rate is disabled",
|
||||||
|
sampleRate: 0,
|
||||||
|
wantSamplerDescription: neverSampleDescription,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sample rate is 100",
|
||||||
|
sampleRate: 100,
|
||||||
|
wantSamplerDescription: "TraceIDRatioBased{0.0001}",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
sampler := determineSampler(tc.sampleRate)
|
||||||
|
if tc.wantSamplerDescription != sampler.Description() {
|
||||||
|
t.Errorf("tracing sampler was not as expected; expected sampler: %#+v, got sampler: %#+v", tc.wantSamplerDescription, sampler.Description())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTracingConfig(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
sampleRate int
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "invalid - sample rate is less than 0",
|
||||||
|
sampleRate: -1,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid - sample rate is more than allowed value",
|
||||||
|
sampleRate: maxSamplingRatePerMillion + 1,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid - sample rate is 100",
|
||||||
|
sampleRate: 100,
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
err := validateTracingConfig(tc.sampleRate)
|
||||||
|
if err == nil && tc.wantErr {
|
||||||
|
t.Errorf("expected error got (%v) error", err)
|
||||||
|
}
|
||||||
|
if err != nil && !tc.wantErr {
|
||||||
|
t.Errorf("expected no errors, got error: (%v)", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -228,15 +228,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
|
|
||||||
if srvcfg.ExperimentalEnableDistributedTracing {
|
if srvcfg.ExperimentalEnableDistributedTracing {
|
||||||
tctx := context.Background()
|
tctx := context.Background()
|
||||||
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
|
tracingExporter, err := newTracingExporter(tctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return e, err
|
return e, err
|
||||||
}
|
}
|
||||||
if tracingExporter == nil || len(opts) == 0 {
|
e.tracingExporterShutdown = func() {
|
||||||
return e, fmt.Errorf("error setting up distributed tracing")
|
tracingExporter.Close(tctx)
|
||||||
}
|
}
|
||||||
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
|
srvcfg.ExperimentalTracerOptions = tracingExporter.opts
|
||||||
srvcfg.ExperimentalTracerOptions = opts
|
|
||||||
|
|
||||||
e.cfg.logger.Info("distributed tracing setup enabled")
|
e.cfg.logger.Info("distributed tracing setup enabled")
|
||||||
}
|
}
|
||||||
|
@ -276,6 +276,7 @@ func newConfig() *config {
|
|||||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.")
|
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.")
|
||||||
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.")
|
fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.")
|
||||||
|
fs.IntVar(&cfg.ec.ExperimentalDistributedTracingSamplingRatePerMillion, "experimental-distributed-tracing-sampling-rate", 0, "Number of samples to collect per million spans for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).")
|
||||||
|
|
||||||
// auth
|
// auth
|
||||||
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
|
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
|
||||||
|
@ -219,6 +219,8 @@ Experimental distributed tracing:
|
|||||||
Distributed tracing service name, must be same across all etcd instances.
|
Distributed tracing service name, must be same across all etcd instances.
|
||||||
--experimental-distributed-tracing-instance-id ''
|
--experimental-distributed-tracing-instance-id ''
|
||||||
Distributed tracing instance ID, must be unique per each etcd instance.
|
Distributed tracing instance ID, must be unique per each etcd instance.
|
||||||
|
--experimental-distributed-tracing-sampling-rate '0'
|
||||||
|
Number of samples to collect per million spans for distributed tracing. Disabled by default.
|
||||||
|
|
||||||
v2 Proxy (to be deprecated in v3.6):
|
v2 Proxy (to be deprecated in v3.6):
|
||||||
--proxy 'off'
|
--proxy 'off'
|
||||||
|
10
tests/go.mod
10
tests/go.mod
@ -36,6 +36,11 @@ require (
|
|||||||
go.etcd.io/etcd/pkg/v3 v3.5.10
|
go.etcd.io/etcd/pkg/v3 v3.5.10
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.10
|
go.etcd.io/etcd/raft/v3 v3.5.10
|
||||||
go.etcd.io/etcd/server/v3 v3.5.10
|
go.etcd.io/etcd/server/v3 v3.5.10
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
|
||||||
|
go.opentelemetry.io/otel v1.20.0
|
||||||
|
go.opentelemetry.io/otel/sdk v1.20.0
|
||||||
|
go.opentelemetry.io/otel/trace v1.20.0
|
||||||
|
go.opentelemetry.io/proto/otlp v1.0.0
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
golang.org/x/crypto v0.14.0
|
golang.org/x/crypto v0.14.0
|
||||||
golang.org/x/sync v0.3.0
|
golang.org/x/sync v0.3.0
|
||||||
@ -72,14 +77,9 @@ require (
|
|||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
|
||||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||||
go.etcd.io/bbolt v1.3.8 // indirect
|
go.etcd.io/bbolt v1.3.8 // indirect
|
||||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
|
|
||||||
go.opentelemetry.io/otel v1.20.0 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
|
||||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.20.0 // indirect
|
|
||||||
go.opentelemetry.io/otel/trace v1.20.0 // indirect
|
|
||||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
|
||||||
go.uber.org/atomic v1.7.0 // indirect
|
go.uber.org/atomic v1.7.0 // indirect
|
||||||
go.uber.org/multierr v1.6.0 // indirect
|
go.uber.org/multierr v1.6.0 // indirect
|
||||||
golang.org/x/net v0.17.0 // indirect
|
golang.org/x/net v0.17.0 // indirect
|
||||||
|
149
tests/integration/tracing_test.go
Normal file
149
tests/integration/tracing_test.go
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
// Copyright 2022 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestTracing ensures that distributed tracing is setup when the feature flag is enabled.
|
||||||
|
func TestTracing(t *testing.T) {
|
||||||
|
BeforeTest(t)
|
||||||
|
testutil.SkipTestIfShortMode(t,
|
||||||
|
"Wal creation tests are depending on embedded etcd server so are integration-level tests.")
|
||||||
|
// set up trace collector
|
||||||
|
listener, err := net.Listen("tcp", "localhost:")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
traceFound := make(chan struct{})
|
||||||
|
defer close(traceFound)
|
||||||
|
|
||||||
|
srv := grpc.NewServer()
|
||||||
|
traceservice.RegisterTraceServiceServer(srv, &traceServer{
|
||||||
|
traceFound: traceFound,
|
||||||
|
filterFunc: containsNodeListSpan})
|
||||||
|
|
||||||
|
go srv.Serve(listener)
|
||||||
|
defer srv.Stop()
|
||||||
|
|
||||||
|
cfg := NewEmbedConfig(t, "default")
|
||||||
|
cfg.ExperimentalEnableDistributedTracing = true
|
||||||
|
cfg.ExperimentalDistributedTracingAddress = listener.Addr().String()
|
||||||
|
cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing"
|
||||||
|
cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100
|
||||||
|
|
||||||
|
// start an etcd instance with tracing enabled
|
||||||
|
etcdSrv, err := embed.StartEtcd(cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer etcdSrv.Close()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-etcdSrv.Server.ReadyNotify():
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
// default randomized election timeout is 1 to 2s, single node will fast-forward 900ms
|
||||||
|
// change the timeout from 1 to 5 seconds to ensure de-flaking this test
|
||||||
|
t.Fatalf("failed to start embed.Etcd for test")
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a client that has tracing enabled
|
||||||
|
tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
|
||||||
|
defer tracer.Shutdown(context.TODO())
|
||||||
|
tp := trace.TracerProvider(tracer)
|
||||||
|
|
||||||
|
tracingOpts := []otelgrpc.Option{
|
||||||
|
otelgrpc.WithTracerProvider(tp),
|
||||||
|
otelgrpc.WithPropagators(
|
||||||
|
propagation.NewCompositeTextMapPropagator(
|
||||||
|
propagation.TraceContext{},
|
||||||
|
propagation.Baggage{},
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
|
||||||
|
dialOptions := []grpc.DialOption{
|
||||||
|
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
|
||||||
|
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))}
|
||||||
|
ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}}
|
||||||
|
cli, err := NewClient(t, ccfg)
|
||||||
|
if err != nil {
|
||||||
|
etcdSrv.Close()
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
// make a request with the instrumented client
|
||||||
|
resp, err := cli.Get(context.TODO(), "key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, resp.Kvs)
|
||||||
|
|
||||||
|
// Wait for a span to be recorded from our request
|
||||||
|
select {
|
||||||
|
case <-traceFound:
|
||||||
|
return
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
t.Fatal("Timed out waiting for trace")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool {
|
||||||
|
for _, resourceSpans := range req.GetResourceSpans() {
|
||||||
|
for _, attr := range resourceSpans.GetResource().GetAttributes() {
|
||||||
|
if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, scoped := range resourceSpans.GetScopeSpans() {
|
||||||
|
for _, span := range scoped.GetSpans() {
|
||||||
|
if span.GetName() == "etcdserverpb.KV/Range" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// traceServer implements TracesServiceServer
|
||||||
|
type traceServer struct {
|
||||||
|
traceFound chan struct{}
|
||||||
|
filterFunc func(req *traceservice.ExportTraceServiceRequest) bool
|
||||||
|
traceservice.UnimplementedTraceServiceServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
|
||||||
|
var emptyValue = traceservice.ExportTraceServiceResponse{}
|
||||||
|
if t.filterFunc(req) {
|
||||||
|
t.traceFound <- struct{}{}
|
||||||
|
}
|
||||||
|
return &emptyValue, nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user