mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14348 from VinozzZ/add-integration-test-for-tracing
embed: add integration test for distributed tracing
This commit is contained in:
commit
b48641e5f2
@ -40,13 +40,19 @@ func validateTracingConfig(samplingRate int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
|
||||
exporter, err = otlptracegrpc.New(ctx,
|
||||
type tracingExporter struct {
|
||||
exporter tracesdk.SpanExporter
|
||||
opts []otelgrpc.Option
|
||||
provider *tracesdk.TracerProvider
|
||||
}
|
||||
|
||||
func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
|
||||
exporter, err := otlptracegrpc.New(ctx,
|
||||
otlptracegrpc.WithInsecure(),
|
||||
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := resource.New(ctx,
|
||||
@ -55,7 +61,7 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
|
||||
@ -63,11 +69,19 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
||||
// resource in case of duplicates.
|
||||
res, err = resource.Merge(res, resWithIDKey)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options,
|
||||
traceProvider := tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
tracesdk.WithSampler(
|
||||
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
|
||||
),
|
||||
)
|
||||
|
||||
options := []otelgrpc.Option{
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
@ -75,15 +89,9 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
||||
),
|
||||
),
|
||||
otelgrpc.WithTracerProvider(
|
||||
tracesdk.NewTracerProvider(
|
||||
tracesdk.WithBatcher(exporter),
|
||||
tracesdk.WithResource(res),
|
||||
tracesdk.WithSampler(
|
||||
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
|
||||
),
|
||||
),
|
||||
traceProvider,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
cfg.logger.Debug(
|
||||
"distributed tracing enabled",
|
||||
@ -93,7 +101,20 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
|
||||
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
|
||||
)
|
||||
|
||||
return exporter, options, err
|
||||
return &tracingExporter{
|
||||
exporter: exporter,
|
||||
opts: options,
|
||||
provider: traceProvider,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (te *tracingExporter) Close(ctx context.Context) {
|
||||
if te.provider != nil {
|
||||
te.provider.Shutdown(ctx)
|
||||
}
|
||||
if te.exporter != nil {
|
||||
te.exporter.Shutdown(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func determineSampler(samplingRate int) tracesdk.Sampler {
|
||||
|
@ -227,15 +227,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
|
||||
if srvcfg.ExperimentalEnableDistributedTracing {
|
||||
tctx := context.Background()
|
||||
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
|
||||
tracingExporter, err := newTracingExporter(tctx, cfg)
|
||||
if err != nil {
|
||||
return e, err
|
||||
}
|
||||
if tracingExporter == nil || len(opts) == 0 {
|
||||
return e, fmt.Errorf("error setting up distributed tracing")
|
||||
e.tracingExporterShutdown = func() {
|
||||
tracingExporter.Close(tctx)
|
||||
}
|
||||
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
|
||||
srvcfg.ExperimentalTracerOptions = opts
|
||||
srvcfg.ExperimentalTracerOptions = tracingExporter.opts
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"distributed tracing setup enabled",
|
||||
|
10
tests/go.mod
10
tests/go.mod
@ -37,6 +37,11 @@ require (
|
||||
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0
|
||||
go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0
|
||||
go.etcd.io/etcd/server/v3 v3.6.0-alpha.0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0
|
||||
go.opentelemetry.io/otel v1.7.0
|
||||
go.opentelemetry.io/otel/sdk v1.7.0
|
||||
go.opentelemetry.io/otel/trace v1.7.0
|
||||
go.opentelemetry.io/proto/otlp v0.16.0
|
||||
go.uber.org/zap v1.21.0
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
|
||||
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
|
||||
@ -80,14 +85,9 @@ require (
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||
go.etcd.io/bbolt v1.3.6 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
|
||||
go.opentelemetry.io/otel v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.7.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
golang.org/x/net v0.0.0-20220919171627-f8f703f97925 // indirect
|
||||
|
146
tests/integration/tracing_test.go
Normal file
146
tests/integration/tracing_test.go
Normal file
@ -0,0 +1,146 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// TestTracing ensures that distributed tracing is setup when the feature flag is enabled.
|
||||
func TestTracing(t *testing.T) {
|
||||
testutil.SkipTestIfShortMode(t,
|
||||
"Wal creation tests are depending on embedded etcd server so are integration-level tests.")
|
||||
// set up trace collector
|
||||
listener, err := net.Listen("tcp", "localhost:")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
traceFound := make(chan struct{})
|
||||
defer close(traceFound)
|
||||
|
||||
srv := grpc.NewServer()
|
||||
traceservice.RegisterTraceServiceServer(srv, &traceServer{
|
||||
traceFound: traceFound,
|
||||
filterFunc: containsNodeListSpan})
|
||||
|
||||
go srv.Serve(listener)
|
||||
defer srv.Stop()
|
||||
|
||||
cfg := integration.NewEmbedConfig(t, "default")
|
||||
cfg.ExperimentalEnableDistributedTracing = true
|
||||
cfg.ExperimentalDistributedTracingAddress = listener.Addr().String()
|
||||
cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing"
|
||||
cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100
|
||||
|
||||
// start an etcd instance with tracing enabled
|
||||
etcdSrv, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer etcdSrv.Close()
|
||||
|
||||
select {
|
||||
case <-etcdSrv.Server.ReadyNotify():
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("failed to start embed.Etcd for test")
|
||||
}
|
||||
|
||||
// create a client that has tracing enabled
|
||||
tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
|
||||
defer tracer.Shutdown(context.TODO())
|
||||
tp := trace.TracerProvider(tracer)
|
||||
|
||||
tracingOpts := []otelgrpc.Option{
|
||||
otelgrpc.WithTracerProvider(tp),
|
||||
otelgrpc.WithPropagators(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
)),
|
||||
}
|
||||
|
||||
dialOptions := []grpc.DialOption{
|
||||
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
|
||||
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))}
|
||||
ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.ACUrls[0].String()}}
|
||||
cli, err := integration.NewClient(t, ccfg)
|
||||
if err != nil {
|
||||
etcdSrv.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// make a request with the instrumented client
|
||||
resp, err := cli.Get(context.TODO(), "key")
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, resp.Kvs)
|
||||
|
||||
// Wait for a span to be recorded from our request
|
||||
select {
|
||||
case <-traceFound:
|
||||
return
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("Timed out waiting for trace")
|
||||
}
|
||||
}
|
||||
|
||||
func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool {
|
||||
for _, resourceSpans := range req.GetResourceSpans() {
|
||||
for _, attr := range resourceSpans.GetResource().GetAttributes() {
|
||||
if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" {
|
||||
continue
|
||||
}
|
||||
for _, scoped := range resourceSpans.GetScopeSpans() {
|
||||
for _, span := range scoped.GetSpans() {
|
||||
if span.GetName() == "etcdserverpb.KV/Range" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// traceServer implements TracesServiceServer
|
||||
type traceServer struct {
|
||||
traceFound chan struct{}
|
||||
filterFunc func(req *traceservice.ExportTraceServiceRequest) bool
|
||||
traceservice.UnimplementedTraceServiceServer
|
||||
}
|
||||
|
||||
func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
|
||||
var emptyValue = traceservice.ExportTraceServiceResponse{}
|
||||
if t.filterFunc(req) {
|
||||
t.traceFound <- struct{}{}
|
||||
}
|
||||
return &emptyValue, nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user