Add initial Tracing with OpenTelemetry

This commit is contained in:
Lili Cosic
2021-04-30 16:44:10 +02:00
parent a8f38ebd6f
commit 1a718a958e
28 changed files with 449 additions and 60 deletions

View File

@@ -46,6 +46,13 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@@ -73,6 +80,8 @@ type Etcd struct {
sctxs map[string]*serveCtx
metricsListeners []net.Listener
tracingExporterShutdown func()
Server *etcdserver.EtcdServer
cfg Config
@@ -207,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
LoggerWriteSyncer: cfg.loggerWriteSyncer,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
@@ -216,7 +226,22 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
}
if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, opts, err := e.setupTracing(tctx)
if err != nil {
return e, err
}
if tracingExporter == nil || len(opts) == 0 {
return e, fmt.Errorf("error setting up distributed tracing")
}
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
srvcfg.ExperimentalTracerOptions = opts
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
@@ -382,6 +407,11 @@ func (e *Etcd) Close() {
e.metricsListeners[i].Close()
}
// shutdown tracing exporter
if e.tracingExporterShutdown != nil {
e.tracingExporterShutdown()
}
// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
@@ -774,3 +804,52 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er
}
return ret, nil
}
func (e *Etcd) setupTracing(ctx context.Context) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
exporter, err = otlp.NewExporter(ctx,
otlpgrpc.NewDriver(
otlpgrpc.WithEndpoint(e.cfg.ExperimentalDistributedTracingAddress),
otlpgrpc.WithInsecure(),
))
if err != nil {
return nil, nil, err
}
res := resource.NewWithAttributes(
semconv.ServiceNameKey.String(e.cfg.ExperimentalDistributedTracingServiceName),
)
// As Tracing service Instance ID must be unique, it should
// never use the empty default string value, so we only set it
// if it's a non empty string.
if e.cfg.ExperimentalDistributedTracingServiceInstanceID != "" {
resWithIDKey := resource.NewWithAttributes(
(semconv.ServiceInstanceIDKey.String(e.cfg.ExperimentalDistributedTracingServiceInstanceID)),
)
// Merge resources to combine into a new
// resource in case of duplicates.
res = resource.Merge(res, resWithIDKey)
}
options = append(options,
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
),
otelgrpc.WithTracerProvider(
tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
),
),
)
e.cfg.logger.Info(
"distributed tracing enabled",
zap.String("distributed-tracing-address", e.cfg.ExperimentalDistributedTracingAddress),
zap.String("distributed-tracing-service-name", e.cfg.ExperimentalDistributedTracingServiceName),
zap.String("distributed-tracing-service-instance-id", e.cfg.ExperimentalDistributedTracingServiceInstanceID),
)
return exporter, options, err
}