Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public final class ConfigDefaults {
static final int DEFAULT_METRICS_OTEL_TIMEOUT = 7_500; // ms
static final int DEFAULT_METRICS_OTEL_CARDINALITY_LIMIT = 2_000;

static final int DEFAULT_TRACE_STATS_INTERVAL = 10_000; // ms

public static final boolean DEFAULT_METRICS_OTEL_EXPERIMENTAL_ENABLED = true;

public static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public final class GeneralConfig {
public static final String TRACE_STATS_COMPUTATION_ENABLED = "trace.stats.computation.enabled";
public static final String TRACE_STATS_COMPUTATION_IGNORE_AGENT_VERSION =
"trace.stats.computation.ignore.agent.version";

public static final String TRACE_OTEL_SEMANTICS_ENABLED = "trace.otel.semantics.enabled";

public static final String TRACER_METRICS_ENABLED = "trace.tracer.metrics.enabled";
public static final String TRACER_METRICS_BUFFERING_ENABLED =
"trace.tracer.metrics.buffering.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public final class OtlpConfig {
public static final String OTLP_METRICS_TEMPORALITY_PREFERENCE =
"otlp.metrics.temporality.preference";

public static final String TRACES_SPAN_METRICS_ENABLED = "traces.span.metrics.enabled";

public static final String TRACE_OTEL_ENABLED = "trace.otel.enabled";
public static final String TRACE_OTEL_EXPORTER = "trace.otel.exporter";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package datadog.trace.common.metrics;

import datadog.metrics.api.Histogram;
import datadog.metrics.api.HistogramWithSum;
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Projects a client-side-stats {@link Histogram} (a DDSketch over span durations recorded in
* <em>nanoseconds</em>) onto the fixed explicit-bounds histogram layout mandated by the OTLP Trace
* Metrics Export RFC.
*/
final class OtlpHistogramBuckets {
private OtlpHistogramBuckets() {}

private static final double NANOS_PER_SECOND = 1_000_000_000d;

static final double[] BOUNDS_SECONDS = {
0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15
};

static final List<Double> EXPLICIT_BOUNDS;

static {
List<Double> bounds = new ArrayList<>(BOUNDS_SECONDS.length + 1);
for (double bound : BOUNDS_SECONDS) {
bounds.add(bound);
}
bounds.add(Double.POSITIVE_INFINITY);
EXPLICIT_BOUNDS = Collections.unmodifiableList(bounds);
}

static int bucketIndex(double seconds) {
for (int i = 0; i < BOUNDS_SECONDS.length; i++) {
if (seconds <= BOUNDS_SECONDS[i]) {
return i;
}
}
return BOUNDS_SECONDS.length; // overflow
}

/**
* Re-bins {@code histogram} (nanosecond-valued) into an {@link OtlpHistogramPoint} expressed in
* seconds with OTLP's fixed bucket layout. {@code count}, {@code min}, and {@code max} are taken
* directly from the sketch; {@code sum} is exact when the sketch tracks it ({@link
* HistogramWithSum}) and otherwise best-effort estimated from bin upper bounds.
*/
static OtlpHistogramPoint toHistogramPoint(Histogram histogram) {
long[] bucketCounts = new long[BOUNDS_SECONDS.length + 1];

List<Double> binBoundaries = histogram.getBinBoundaries();
List<Double> binCounts = histogram.getBinCounts();
double estimatedSumSeconds = 0d;
for (int i = 0; i < binBoundaries.size(); i++) {
double upperSeconds = binBoundaries.get(i) / NANOS_PER_SECOND;
long count = (long) binCounts.get(i).doubleValue();
bucketCounts[bucketIndex(upperSeconds)] += count;
estimatedSumSeconds += upperSeconds * count;
}

List<Double> counts = new ArrayList<>(bucketCounts.length);
for (long count : bucketCounts) {
counts.add((double) count);
}

double sumSeconds =
histogram instanceof HistogramWithSum
? ((HistogramWithSum) histogram).getSum() / NANOS_PER_SECOND
: estimatedSumSeconds;

double minSeconds = histogram.isEmpty() ? 0d : histogram.getMinValue() / NANOS_PER_SECOND;
double maxSeconds = histogram.isEmpty() ? 0d : histogram.getMaxValue() / NANOS_PER_SECOND;

return new OtlpHistogramPoint(
histogram.getCount(), EXPLICIT_BOUNDS, counts, sumSeconds, minSeconds, maxSeconds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package datadog.trace.common.metrics;

import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ATTRIBUTE;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag;
import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordDataPointMessage;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordMetricMessage;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordScopedMetricsMessage;

import datadog.communication.serialization.GrowableBuffer;
import datadog.metrics.api.Histogram;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpProtoBuffer;
import datadog.trace.core.otlp.common.OtlpSender;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link MetricWriter} that exports the existing client-side trace (RED) stats as a single
* vendor-neutral OTLP delta-temporality histogram named {@code traces.span.sdk.metrics.duration}
* (unit {@code s}).
*
* <p>This is the parallel-to-{@link SerializingMetricWriter} OTLP export path. It hangs off the
* same in-memory aggregation ({@link ConflatingMetricsAggregator} / {@link Aggregator}) and
* consumes the same {@link AggregateEntry} stream; only the wire encoding and transport differ.
* Native msgpack stats and OTLP export are mutually exclusive (selected at the factory).
*
* <p>Assembly mirrors {@code OtlpMetricsProtoCollector}
*/
public final class OtlpStatsMetricWriter implements MetricWriter {
private static final Logger log = LoggerFactory.getLogger(OtlpStatsMetricWriter.class);

static final String METRIC_NAME = "traces.span.sdk.metrics.duration";
static final String METRIC_UNIT = "s";

private static final OtelInstrumentDescriptor METRIC_DESCRIPTOR =
new OtelInstrumentDescriptor(METRIC_NAME, HISTOGRAM, false, null, METRIC_UNIT);
private static final OtelInstrumentationScope SCOPE =
new OtelInstrumentationScope("datadog.trace.metrics", null, null);

private static final int DP_START_TIME_FIELD = 2;
private static final int DP_TIME_FIELD = 3;
private static final int DP_ATTRIBUTES_FIELD = 9;

private static final String SPAN_NAME = "span.name";
private static final String SPAN_KIND = "span.kind";
private static final String HTTP_REQUEST_METHOD = "http.request.method";
private static final String HTTP_RESPONSE_STATUS_CODE = "http.response.status_code";
private static final String HTTP_ROUTE = "http.route";
private static final String RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code";
private static final String STATUS_CODE = "status.code";
private static final String STATUS_CODE_ERROR = "ERROR";

@Nullable private final OtlpSender sender;

private final GrowableBuffer buf = new GrowableBuffer(512);
private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192);

private long startNanos;
private long endNanos;

private int payloadBytes;
private int scopedBytes;
private int metricBytes;

public OtlpStatsMetricWriter(Config config) {
this(createSender(config));
}

// visible for testing: lets tests inject a capturing sender to decode the emitted protobuf
OtlpStatsMetricWriter(@Nullable OtlpSender sender) {
this.sender = sender;
}

@Nullable
private static OtlpSender createSender(Config config) {
// mirrors OtlpMetricsService's protocol-based sender selection
switch (config.getOtlpMetricsProtocol()) {
case GRPC:
return new OtlpGrpcSender(
config.getOtlpMetricsEndpoint(),
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
config.getOtlpMetricsHeaders(),
config.getOtlpMetricsTimeout(),
config.getOtlpMetricsCompression());
case HTTP_PROTOBUF:
return new OtlpHttpSender(
config.getOtlpMetricsEndpoint(),
"/v1/metrics",
config.getOtlpMetricsHeaders(),
config.getOtlpMetricsTimeout(),
config.getOtlpMetricsCompression());
default:
// HTTP_JSON has no protobuf-free encoder yet; JSON transport is deferred per the plan.
log.debug(
"Unsupported OTLP metrics protocol for trace metrics export: {}",
config.getOtlpMetricsProtocol());
return null;
}
}

@Override
public void startBucket(int metricCount, long start, long duration) {
// start/duration arrive as epoch nanos / interval nanos (see Aggregator#report)
this.startNanos = start;
this.endNanos = start + duration;
this.payloadBytes = 0;
this.scopedBytes = 0;
this.metricBytes = 0;
}

@Override
public void add(AggregateEntry entry) {
Histogram okLatencies = entry.getOkLatencies();
if (!okLatencies.isEmpty()) {
addDataPoint(entry, okLatencies, false);
}

Histogram errorLatencies = entry.getErrorLatencies();
if (errorLatencies != null) {
addDataPoint(entry, errorLatencies, true);
}
}

private void addDataPoint(AggregateEntry entry, Histogram latencies, boolean error) {
writeDataPointAttributes(entry, error);
writeTag(buf, DP_START_TIME_FIELD, I64_WIRE_TYPE);
writeI64(buf, startNanos);
writeTag(buf, DP_TIME_FIELD, I64_WIRE_TYPE);
writeI64(buf, endNanos);
OtlpHistogramPoint point = OtlpHistogramBuckets.toHistogramPoint(latencies);
metricBytes += recordDataPointMessage(buf, point, protobuf);
}

private void writeDataPointAttributes(AggregateEntry entry, boolean error) {
// TODO(step 4): branch on isTraceOtelSemanticsEnabled() to add the datadog.* attribute set in
// default mode and to omit it in OTel-semantics mode. The OTel-semconv attributes below are
// emitted in both modes.
if (error) {
writeStringAttribute(STATUS_CODE, STATUS_CODE_ERROR);
}
writeStringAttribute(SPAN_NAME, entry.getResource());
writeStringAttribute(SPAN_KIND, entry.getSpanKind());
if (entry.getHttpMethod() != null) {
writeStringAttribute(HTTP_REQUEST_METHOD, entry.getHttpMethod());
}
if (entry.getHttpStatusCode() != 0) {
writeLongAttribute(HTTP_RESPONSE_STATUS_CODE, entry.getHttpStatusCode());
}
if (entry.getHttpEndpoint() != null) {
writeStringAttribute(HTTP_ROUTE, entry.getHttpEndpoint());
}
if (entry.getGrpcStatusCode() != null) {
writeStringAttribute(RPC_RESPONSE_STATUS_CODE, entry.getGrpcStatusCode());
}
}

private void writeStringAttribute(String key, @Nullable UTF8BytesString value) {
if (value != null) {
writeStringAttribute(key, value.toString());
}
}

private void writeStringAttribute(String key, String value) {
writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE);
writeAttribute(buf, STRING_ATTRIBUTE, key, value);
}

private void writeLongAttribute(String key, long value) {
writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE);
writeAttribute(buf, LONG_ATTRIBUTE, key, value);
}

@Override
public void finishBucket() {
try {
if (metricBytes > 0) {
scopedBytes += recordMetricMessage(buf, METRIC_DESCRIPTOR, metricBytes, protobuf);
}
if (scopedBytes > 0) {
payloadBytes += recordScopedMetricsMessage(buf, SCOPE, scopedBytes, protobuf);
}
if (payloadBytes == 0) {
return;
}
payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE);
protobuf.recordMessage(buf, 1, payloadBytes);

if (sender != null) {
sender.send(protobuf.toPayload());
}
} finally {
reset();
}
}

@Override
public void reset() {
buf.reset();
protobuf.reset();
payloadBytes = 0;
scopedBytes = 0;
metricBytes = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ static byte[] buildResourceMessage(Config config) {
if (!version.isEmpty()) {
writeResourceAttribute(buf, "service.version", version);
}
if (config.isReportHostName()) {
String hostName = config.getHostName();
if (hostName != null && !hostName.isEmpty()) {
writeResourceAttribute(buf, "host.name", hostName);
}
}
writeResourceAttribute(buf, "telemetry.sdk.name", "datadog");
writeResourceAttribute(buf, "telemetry.sdk.version", TRACER_VERSION);
writeResourceAttribute(buf, "telemetry.sdk.language", "java");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME;
import static datadog.trace.api.config.GeneralConfig.TAGS;
import static datadog.trace.api.config.GeneralConfig.VERSION;
import static datadog.trace.api.config.TracerConfig.TRACE_REPORT_HOSTNAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -99,6 +100,16 @@ static Stream<Arguments> resourceMessageCases() {
"service.name", "my-service",
"region", "us-east",
"team", "platform")),
// report-hostname disabled (default): no host.name written
Arguments.of(
"report-hostname disabled",
props(SERVICE_NAME, "my-service"),
attrs("service.name", "my-service")),
// report-hostname enabled: host.name written with the detected hostname
Arguments.of(
"report-hostname enabled",
props(SERVICE_NAME, "my-service", TRACE_REPORT_HOSTNAME, "true"),
attrs("service.name", "my-service", "host.name", Config.get().getHostName())),
// all config values set together; telemetry.sdk.* keys in tags must be ignored
Arguments.of(
"service, env, version, and tags all set",
Expand Down
Loading