-
Notifications
You must be signed in to change notification settings - Fork 333
Add OtlpWriter for OTLP traces export #11200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
145f884
Introduce isTraceOtlpExporterEnabled branching logic for Sampler.java…
mtoffl01 d183f39
Include regression test for UNSET spans in otlp mode
mtoffl01 d69e491
Create new OTLP writer type, enabled by default when OTEL_TRACES_EXPO…
mtoffl01 e7e3a95
Implement OtlpPayloadDispatcher.java
mtoffl01 af97659
Introduce OtlpPayloadDispatcherTest.java
mtoffl01 015e272
Implement OtlpWriter
mtoffl01 8194971
Introduce OtlpWriterTest.java
mtoffl01 a5cd8fe
Introduce OtlpWriterCombinedTest.java with a TODO
mtoffl01 1615566
WriterFactory: Add OTLP branch for OtlpWriter; include a TODO for a test
mtoffl01 46887fe
nits
mtoffl01 eb2b116
Implement OtlpWriter tests in WriterFactoryTest
mtoffl01 b02aab1
Document problems with OtlpWriterCombinedTest
mtoffl01 594b4b8
fill NPE on writeSpanTag
mtoffl01 491533b
Regression test for UNSET spans in otlp writer mode
mtoffl01 53d53d7
fix grpc path for OtlpWriter and OtlpSender - plus a regression test
mtoffl01 edc1081
Remove OtlpTraceProtoCollector INSTANCE singleton
mtoffl01 5acc0d9
Implement happy path test in OtlpWriterCombinedTest
mtoffl01 2456c8b
Test the OTLP http request body in OtlpHttpRequestBodyTest
mtoffl01 96954d7
Fix NPE on null span type tag, and add regression test
mtoffl01 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
69 changes: 69 additions & 0 deletions
69
dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package datadog.trace.common.writer; | ||
|
|
||
| import datadog.trace.core.CoreSpan; | ||
| import datadog.trace.core.DDSpanContext; | ||
| import datadog.trace.core.otlp.common.OtlpPayload; | ||
| import datadog.trace.core.otlp.common.OtlpSender; | ||
| import datadog.trace.core.otlp.trace.OtlpTraceCollector; | ||
| import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| final class OtlpPayloadDispatcher implements PayloadDispatcher { | ||
| private final OtlpTraceCollector collector; | ||
| private final OtlpSender sender; | ||
|
|
||
| OtlpPayloadDispatcher(OtlpSender sender) { | ||
| this(sender, new OtlpTraceProtoCollector()); | ||
| } | ||
|
|
||
| OtlpPayloadDispatcher(OtlpSender sender, OtlpTraceCollector collector) { | ||
| this.sender = sender; | ||
| this.collector = collector; | ||
| } | ||
|
|
||
| @Override | ||
| public void addTrace(List<? extends CoreSpan<?>> trace) { | ||
| List<CoreSpan<?>> sampled = null; | ||
| for (CoreSpan<?> span : trace) { | ||
| if (shouldExport(span)) { | ||
| if (sampled == null) { | ||
| sampled = new ArrayList<>(trace.size()); | ||
| } | ||
| sampled.add(span); | ||
| } | ||
| } | ||
| if (sampled != null) { | ||
| collector.addTrace(sampled); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void flush() { | ||
| OtlpPayload payload = collector.collectTraces(); | ||
| if (payload != OtlpPayload.EMPTY) { | ||
| sender.send(payload); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onDroppedTrace(int spanCount) { | ||
| // TODO: surface drop counts via HealthMetrics | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've purposefully left this out as its not in the current spec |
||
| } | ||
|
|
||
| @Override | ||
| public Collection<RemoteApi> getApis() { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| private static boolean shouldExport(CoreSpan<?> span) { | ||
| // trace-level sampling priority | ||
| if (span.samplingPriority() > 0) { | ||
|
mcculls marked this conversation as resolved.
|
||
| return true; | ||
| } | ||
| // span-level sampling priority | ||
| return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null; | ||
| } | ||
| } | ||
159 changes: 159 additions & 0 deletions
159
dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpWriter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| package datadog.trace.common.writer; | ||
|
|
||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_GRPC_TRACES_ENDPOINT; | ||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT; | ||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT; | ||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_TRACES_TIMEOUT; | ||
|
|
||
| import datadog.communication.ddagent.DroppingPolicy; | ||
| import datadog.trace.api.config.OtlpConfig; | ||
| import datadog.trace.common.sampling.SingleSpanSampler; | ||
| import datadog.trace.common.writer.ddagent.Prioritization; | ||
| import datadog.trace.core.monitor.HealthMetrics; | ||
| import datadog.trace.core.otlp.common.OtlpGrpcSender; | ||
| import datadog.trace.core.otlp.common.OtlpHttpSender; | ||
| import datadog.trace.core.otlp.common.OtlpSender; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class OtlpWriter extends RemoteWriter { | ||
|
|
||
| private static final int BUFFER_SIZE = 1024; | ||
|
mtoffl01 marked this conversation as resolved.
|
||
| private static final String HTTP_TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT; | ||
| private static final String GRPC_TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_GRPC_TRACES_ENDPOINT; | ||
| private static final String DEFAULT_OTLP_HTTP_ENDPOINT = | ||
| "http://localhost:" + DEFAULT_OTLP_HTTP_PORT + HTTP_TRACES_SIGNAL_PATH; | ||
|
|
||
| public static OtlpWriterBuilder builder() { | ||
| return new OtlpWriterBuilder(); | ||
| } | ||
|
|
||
| private final OtlpSender sender; | ||
|
|
||
| OtlpWriter( | ||
| TraceProcessingWorker worker, | ||
| PayloadDispatcher dispatcher, | ||
| OtlpSender sender, | ||
| HealthMetrics healthMetrics, | ||
| int flushTimeout, | ||
| TimeUnit flushTimeoutUnit, | ||
| boolean alwaysFlush) { | ||
| super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush); | ||
| this.sender = sender; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| super.close(); | ||
| sender.shutdown(); | ||
| } | ||
|
|
||
| // only used by tests | ||
| OtlpSender getSender() { | ||
| return sender; | ||
| } | ||
|
|
||
| public static class OtlpWriterBuilder { | ||
| private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT; | ||
| private Map<String, String> headers = Collections.emptyMap(); | ||
| private int timeoutMillis = DEFAULT_OTLP_TRACES_TIMEOUT; | ||
| private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF; | ||
| private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE; | ||
| private int traceBufferSize = BUFFER_SIZE; | ||
| private HealthMetrics healthMetrics = HealthMetrics.NO_OP; | ||
| private int flushIntervalMilliseconds = 1000; | ||
| private int flushTimeout = 1; | ||
| private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS; | ||
| private boolean alwaysFlush = false; | ||
| private SingleSpanSampler singleSpanSampler; | ||
| private OtlpSender sender; | ||
|
mtoffl01 marked this conversation as resolved.
|
||
|
|
||
| public OtlpWriterBuilder endpoint(String endpoint) { | ||
| this.endpoint = endpoint; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder headers(Map<String, String> headers) { | ||
| this.headers = headers; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder timeoutMillis(int timeoutMillis) { | ||
| this.timeoutMillis = timeoutMillis; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) { | ||
| this.protocol = protocol; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder compression(OtlpConfig.Compression compression) { | ||
| this.compression = compression; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder traceBufferSize(int traceBufferSize) { | ||
| this.traceBufferSize = traceBufferSize; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) { | ||
| this.healthMetrics = healthMetrics; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) { | ||
| this.flushIntervalMilliseconds = flushIntervalMilliseconds; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) { | ||
| this.flushTimeout = flushTimeout; | ||
| this.flushTimeoutUnit = flushTimeoutUnit; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) { | ||
| this.alwaysFlush = alwaysFlush; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) { | ||
| this.singleSpanSampler = singleSpanSampler; | ||
| return this; | ||
| } | ||
|
|
||
| OtlpWriterBuilder sender(OtlpSender sender) { | ||
| this.sender = sender; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriter build() { | ||
| if (sender == null) { | ||
| sender = | ||
| protocol == OtlpConfig.Protocol.GRPC | ||
| ? new OtlpGrpcSender( | ||
| endpoint, GRPC_TRACES_SIGNAL_PATH, headers, timeoutMillis, compression) | ||
| : new OtlpHttpSender( | ||
| endpoint, HTTP_TRACES_SIGNAL_PATH, headers, timeoutMillis, compression); | ||
| } | ||
|
|
||
| final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender); | ||
| final TraceProcessingWorker worker = | ||
| new TraceProcessingWorker( | ||
| traceBufferSize, | ||
| healthMetrics, | ||
| dispatcher, | ||
| DroppingPolicy.DISABLED, | ||
| Prioritization.ENSURE_TRACE, | ||
| flushIntervalMilliseconds, | ||
| TimeUnit.MILLISECONDS, | ||
| singleSpanSampler); | ||
|
|
||
| return new OtlpWriter( | ||
| worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.