embed: add integration test for distributed tracing

To verify distributed tracing feature is correctly setup, this PR adds
an integration test for this feature.
In the process of writing the test, I discovered a goroutine leak due to
the TraceProvider not being closed. This PR fixs this issue as well.

Signed-off-by: Yingrong Zhao <yingrong.zhao@gmail.com>
This commit is contained in:
Yingrong Zhao
2022-08-15 11:18:25 -04:00
parent ae36a577d7
commit ea2f299ba0
3 changed files with 183 additions and 20 deletions

View File

@@ -40,13 +40,19 @@ func validateTracingConfig(samplingRate int) error {
return nil
}
func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
exporter, err = otlptracegrpc.New(ctx,
type tracingExporter struct {
exporter tracesdk.SpanExporter
opts []otelgrpc.Option
shutdown func(ctx context.Context)
}
func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
exporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
)
if err != nil {
return nil, nil, err
return nil, err
}
res, err := resource.New(ctx,
@@ -55,7 +61,7 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
),
)
if err != nil {
return nil, nil, err
return nil, err
}
if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
@@ -63,11 +69,18 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
// resource in case of duplicates.
res, err = resource.Merge(res, resWithIDKey)
if err != nil {
return nil, nil, err
return nil, err
}
}
options = append(options,
traceProvider := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
)
options := []otelgrpc.Option{
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
@@ -75,15 +88,9 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
),
),
otelgrpc.WithTracerProvider(
tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
),
traceProvider,
),
)
}
cfg.logger.Debug(
"distributed tracing enabled",
@@ -93,7 +100,18 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
)
return exporter, options, err
return &tracingExporter{
exporter: exporter,
opts: options,
shutdown: func(ctx context.Context) {
traceProvider.Shutdown(ctx)
},
}, nil
}
func (te *tracingExporter) Close(ctx context.Context) {
te.exporter.Shutdown(ctx)
te.shutdown(ctx)
}
func determineSampler(samplingRate int) tracesdk.Sampler {

View File

@@ -227,15 +227,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
tracingExporter, err := newTracingExporter(tctx, cfg)
if err != nil {
return e, err
}
if tracingExporter == nil || len(opts) == 0 {
return e, fmt.Errorf("error setting up distributed tracing")
e.tracingExporterShutdown = func() {
tracingExporter.Close(tctx)
}
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
srvcfg.ExperimentalTracerOptions = opts
srvcfg.ExperimentalTracerOptions = tracingExporter.opts
e.cfg.logger.Info(
"distributed tracing setup enabled",