Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ public final class ConfigDefaults {
static final int DEFAULT_METRICS_OTEL_TIMEOUT = 7_500; // ms
static final int DEFAULT_METRICS_OTEL_CARDINALITY_LIMIT = 2_000;

static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
public static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms

static final String DEFAULT_OTLP_HTTP_METRICS_ENDPOINT = "v1/metrics";
static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
static final String DEFAULT_OTLP_HTTP_PORT = "4318";
static final String DEFAULT_OTLP_GRPC_PORT = "4317";
public static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
public static final String DEFAULT_OTLP_GRPC_TRACES_ENDPOINT =
"opentelemetry.proto.collector.trace.v1.TraceService/Export";
public static final String DEFAULT_OTLP_HTTP_PORT = "4318";
public static final String DEFAULT_OTLP_GRPC_PORT = "4317";

static final int DEFAULT_DOGSTATSD_START_DELAY = 15; // seconds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static Sampler forConfig(final Config config, final TraceConfig traceConf
log.error("Invalid sampler configuration. Using AllSampler", e);
sampler = new AllSampler();
}
// TODO: if OTLP trace export enabled, select ParentBasedAlwaysOnSampler here
} else if (config.isPrioritySamplingEnabled()) {
if (KEEP.equalsIgnoreCase(config.getPrioritySamplingForce())) {
log.debug("Force Sampling Priority to: SAMPLER_KEEP.");
Expand All @@ -90,9 +89,19 @@ public static Sampler forConfig(final Config config, final TraceConfig traceConf
log.debug("Force Sampling Priority to: SAMPLER_DROP.");
sampler =
new ForcePrioritySampler(PrioritySampling.SAMPLER_DROP, SamplingMechanism.DEFAULT);
} else if (config.isTraceOtlpExporterEnabled()) {
// RateByServiceTraceSampler relies on the Datadog Agent for rate updates.
log.debug(
"OTLP traces export enabled. Using ParentBasedAlwaysOnSampler instead of RateByServiceTraceSampler.");
sampler = new ParentBasedAlwaysOnSampler();
} else {
sampler = new RateByServiceTraceSampler();
}
} else if (config.isTraceOtlpExporterEnabled()) {
// AllSampler does not emit a sampling priority; OTLP export requires one.
log.debug(
"OTLP traces export enabled. Using ParentBasedAlwaysOnSampler instead of AllSampler.");
sampler = new ParentBasedAlwaysOnSampler();
} else {
sampler = new AllSampler();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datadog.trace.common.writer;

import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import datadog.trace.core.otlp.trace.OtlpTraceCollector;
import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

final class OtlpPayloadDispatcher implements PayloadDispatcher {
private final OtlpTraceCollector collector;
private final OtlpSender sender;

OtlpPayloadDispatcher(OtlpSender sender) {
this(sender, new OtlpTraceProtoCollector());
}

OtlpPayloadDispatcher(OtlpSender sender, OtlpTraceCollector collector) {
this.sender = sender;
this.collector = collector;
}

@Override
public void addTrace(List<? extends CoreSpan<?>> trace) {
List<CoreSpan<?>> sampled = null;
for (CoreSpan<?> span : trace) {
if (shouldExport(span)) {
if (sampled == null) {
sampled = new ArrayList<>(trace.size());
}
sampled.add(span);
}
}
if (sampled != null) {
collector.addTrace(sampled);
Comment thread
mcculls marked this conversation as resolved.
}
}

@Override
public void flush() {
OtlpPayload payload = collector.collectTraces();
if (payload != OtlpPayload.EMPTY) {
sender.send(payload);
}
}

@Override
public void onDroppedTrace(int spanCount) {
// TODO: surface drop counts via HealthMetrics
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've purposefully left this out as its not in the current spec

}

@Override
public Collection<RemoteApi> getApis() {
return Collections.emptyList();
}

private static boolean shouldExport(CoreSpan<?> span) {
// trace-level sampling priority
if (span.samplingPriority() > 0) {
Comment thread
mcculls marked this conversation as resolved.
return true;
}
// span-level sampling priority
return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package datadog.trace.common.writer;

import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_GRPC_TRACES_ENDPOINT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_TRACES_TIMEOUT;

import datadog.communication.ddagent.DroppingPolicy;
import datadog.trace.api.config.OtlpConfig;
import datadog.trace.common.sampling.SingleSpanSampler;
import datadog.trace.common.writer.ddagent.Prioritization;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpSender;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OtlpWriter extends RemoteWriter {

private static final int BUFFER_SIZE = 1024;
Comment thread
mtoffl01 marked this conversation as resolved.
private static final String HTTP_TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
private static final String GRPC_TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_GRPC_TRACES_ENDPOINT;
private static final String DEFAULT_OTLP_HTTP_ENDPOINT =
"http://localhost:" + DEFAULT_OTLP_HTTP_PORT + HTTP_TRACES_SIGNAL_PATH;

public static OtlpWriterBuilder builder() {
return new OtlpWriterBuilder();
}

private final OtlpSender sender;

OtlpWriter(
TraceProcessingWorker worker,
PayloadDispatcher dispatcher,
OtlpSender sender,
HealthMetrics healthMetrics,
int flushTimeout,
TimeUnit flushTimeoutUnit,
boolean alwaysFlush) {
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
this.sender = sender;
}

@Override
public void close() {
super.close();
sender.shutdown();
}

// only used by tests
OtlpSender getSender() {
return sender;
}

public static class OtlpWriterBuilder {
private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT;
private Map<String, String> headers = Collections.emptyMap();
private int timeoutMillis = DEFAULT_OTLP_TRACES_TIMEOUT;
private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF;
private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE;
private int traceBufferSize = BUFFER_SIZE;
private HealthMetrics healthMetrics = HealthMetrics.NO_OP;
private int flushIntervalMilliseconds = 1000;
private int flushTimeout = 1;
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
private boolean alwaysFlush = false;
private SingleSpanSampler singleSpanSampler;
private OtlpSender sender;
Comment thread
mtoffl01 marked this conversation as resolved.

public OtlpWriterBuilder endpoint(String endpoint) {
this.endpoint = endpoint;
return this;
}

public OtlpWriterBuilder headers(Map<String, String> headers) {
this.headers = headers;
return this;
}

public OtlpWriterBuilder timeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
return this;
}

public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) {
this.protocol = protocol;
return this;
}

public OtlpWriterBuilder compression(OtlpConfig.Compression compression) {
this.compression = compression;
return this;
}

public OtlpWriterBuilder traceBufferSize(int traceBufferSize) {
this.traceBufferSize = traceBufferSize;
return this;
}

public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) {
this.healthMetrics = healthMetrics;
return this;
}

public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) {
this.flushIntervalMilliseconds = flushIntervalMilliseconds;
return this;
}

public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
this.flushTimeout = flushTimeout;
this.flushTimeoutUnit = flushTimeoutUnit;
return this;
}

public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) {
this.alwaysFlush = alwaysFlush;
return this;
}

public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) {
this.singleSpanSampler = singleSpanSampler;
return this;
}

OtlpWriterBuilder sender(OtlpSender sender) {
this.sender = sender;
return this;
}

public OtlpWriter build() {
if (sender == null) {
sender =
protocol == OtlpConfig.Protocol.GRPC
? new OtlpGrpcSender(
endpoint, GRPC_TRACES_SIGNAL_PATH, headers, timeoutMillis, compression)
: new OtlpHttpSender(
endpoint, HTTP_TRACES_SIGNAL_PATH, headers, timeoutMillis, compression);
}

final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender);
final TraceProcessingWorker worker =
new TraceProcessingWorker(
traceBufferSize,
healthMetrics,
dispatcher,
DroppingPolicy.DISABLED,
Prioritization.ENSURE_TRACE,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
singleSpanSampler);

return new OtlpWriter(
worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.DD_INTAKE_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.LOGGING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.MULTI_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.OTLP_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.PRINTING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.TRACE_STRUCTURE_WRITER_TYPE;
import static datadog.trace.common.writer.ddagent.Prioritization.ENSURE_TRACE;
Expand Down Expand Up @@ -57,6 +58,8 @@ public static Writer createWriter(
final HealthMetrics healthMetrics,
String configuredType) {

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);

if (LOGGING_WRITER_TYPE.equals(configuredType)) {
return new LoggingWriter();
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
Expand All @@ -67,6 +70,17 @@ public static Writer createWriter(
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
return new MultiWriter(
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
} else if (OTLP_WRITER_TYPE.equals(configuredType)) {
return OtlpWriter.builder()
.endpoint(config.getOtlpTracesEndpoint())
.headers(config.getOtlpTracesHeaders())
.protocol(config.getOtlpTracesProtocol())
.compression(config.getOtlpTracesCompression())
.timeoutMillis(config.getOtlpTracesTimeout())
.healthMetrics(healthMetrics)
.spanSamplingRules(singleSpanSampler)
.flushIntervalMilliseconds(flushIntervalMilliseconds)
.build();
}

if (!DD_AGENT_WRITER_TYPE.equals(configuredType)
Expand All @@ -84,7 +98,6 @@ public static Writer createWriter(
"Using 'EnsureTrace' prioritization type. (Do not use this type if your application is running in production mode)");
}

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);
DDAgentFeaturesDiscovery featuresDiscovery = commObjects.featuresDiscovery(config);

// CI Visibility with bazel support wants to write traces into JSON files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void shutdown() {
client.connectionPool().evictAll();
}

// only used by tests
public HttpUrl url() {
return url;
}

private Request makeRequest(OtlpPayload payload) {
Request.Builder requestBuilder = new Request.Builder().url(url);
if (gzip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public static byte[] recordSpanMessage(
}
writeSpanTag(buf, RESOURCE_NAME, span.getResourceName());
writeSpanTag(buf, OPERATION_NAME, span.getOperationName());
writeSpanTag(buf, SPAN_TYPE, span.getSpanType());
if (span.getSpanType() != null) {
writeSpanTag(buf, SPAN_TYPE, span.getSpanType());
}

span.processTagsAndBaggage(metaWriter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
*/
public final class OtlpTraceProtoCollector implements OtlpTraceCollector {

public static final OtlpTraceProtoCollector INSTANCE = new OtlpTraceProtoCollector();

private static final OtelInstrumentationScope DEFAULT_TRACE_SCOPE =
new OtelInstrumentationScope("", null, null);

Expand Down
Loading
Loading