diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index dd87406ce7f..8a69dbc6e56 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -3,6 +3,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import datadog.trace.common.metrics.SignalItem.StopSignal; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Iterator; @@ -46,7 +47,8 @@ final class Aggregator implements Runnable { final Set commonKeys, int maxAggregates, long reportingInterval, - TimeUnit reportingIntervalTimeUnit) { + TimeUnit reportingIntervalTimeUnit, + HealthMetrics healthMetrics) { this( writer, batchPool, @@ -56,7 +58,8 @@ final class Aggregator implements Runnable { maxAggregates, reportingInterval, reportingIntervalTimeUnit, - DEFAULT_SLEEP_MILLIS); + DEFAULT_SLEEP_MILLIS, + healthMetrics); } Aggregator( @@ -68,14 +71,18 @@ final class Aggregator implements Runnable { int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, - long sleepMillis) { + long sleepMillis, + HealthMetrics healthMetrics) { this.writer = writer; this.batchPool = batchPool; this.inbox = inbox; this.commonKeys = commonKeys; this.aggregates = new LRUCache<>( - new CommonKeyCleaner(commonKeys), maxAggregates * 4 / 3, 0.75f, maxAggregates); + new CommonKeyCleaner(commonKeys, healthMetrics), + maxAggregates * 4 / 3, + 0.75f, + maxAggregates); this.pending = pending; this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; @@ -183,14 +190,19 @@ private static final class CommonKeyCleaner implements LRUCache.ExpiryListener { private final Set commonKeys; + private final HealthMetrics healthMetrics; - private CommonKeyCleaner(Set commonKeys) { + private CommonKeyCleaner(Set commonKeys, HealthMetrics healthMetrics) { this.commonKeys = commonKeys; + this.healthMetrics = healthMetrics; } @Override public void accept(Map.Entry expired) { commonKeys.remove(expired.getKey()); + if (expired.getValue().getHitCount() > 0) { + healthMetrics.onStatsAggregateDropped(); + } } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 010993efe50..f60edf1d700 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -204,7 +204,8 @@ public ConflatingMetricsAggregator( keys.keySet(), maxAggregates, reportingInterval, - timeUnit); + timeUnit, + healthMetric); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index d0531a330cf..257d887029b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -91,6 +91,8 @@ public void onClientStatErrorReceived() {} public void onClientStatDowngraded() {} + public void onStatsAggregateDropped() {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index bdb713e4c14..2df54241e56 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -97,6 +97,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder clientStatsErrors = new LongAdder(); private final LongAdder clientStatsDowngrades = new LongAdder(); + private final LongAdder statsAggregateDropped = new LongAdder(); + private final StatsDClient statsd; private final long interval; private final TimeUnit units; @@ -350,6 +352,11 @@ public void onClientStatErrorReceived() { clientStatsErrors.increment(); } + @Override + public void onStatsAggregateDropped() { + statsAggregateDropped.increment(); + } + @Override public void close() { if (null != cancellation) { @@ -366,8 +373,9 @@ private static class Flush implements AgentTaskScheduler.Task> true + features.peerTags() >> [] + HealthMetrics healthMetrics = Mock(HealthMetrics) + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) + long duration = 100 + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + for (int i = 0; i < maxAggregates + 1; ++i) { + aggregator.publish([ + new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") + ]) + } + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.finishBucket() >> { latch.countDown() } + 1 * healthMetrics.onStatsAggregateDropped() + + cleanup: + aggregator.close() + } + + def "should not report dropped aggregate when evicted entry was already flushed"() { + setup: + int maxAggregates = 5 + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >> [] + HealthMetrics healthMetrics = Mock(HealthMetrics) + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) + aggregator.start() + + when: "fill cache and flush — entries are cleared (hitCount=0) but stay in the LRU" + CountDownLatch latch1 = new CountDownLatch(1) + for (int i = 0; i < maxAggregates; ++i) { + aggregator.publish([ + new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") + ]) + } + aggregator.report() + latch1.await(2, SECONDS) + + then: + 1 * writer.finishBucket() >> { latch1.countDown() } + + when: "publish new distinct spans — LRU evicts the cleared entries before the next report" + CountDownLatch latch2 = new CountDownLatch(1) + for (int i = maxAggregates; i < maxAggregates * 2; ++i) { + aggregator.publish([ + new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") + ]) + } + aggregator.report() + latch2.await(2, SECONDS) + + then: "no drop metric because all evicted entries had hitCount=0 (already reported)" + 1 * writer.finishBucket() >> { latch2.countDown() } + 0 * healthMetrics.onStatsAggregateDropped() + + cleanup: + aggregator.close() + } + def "aggregate not updated in reporting interval not reported"() { setup: int maxAggregates = 10 diff --git a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java index d04c674570d..670c4cda113 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java @@ -385,6 +385,19 @@ void testOnLongRunningUpdate() throws InterruptedException { verifyNoMoreInteractions(statsD); } + @Test + void testOnStatsAggregateDropped() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + try (TracerHealthMetrics healthMetrics = + new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS)) { + healthMetrics.start(); + healthMetrics.onStatsAggregateDropped(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + verify(statsD).count("stats.dropped_aggregates", 1L, "reason:lru_eviction"); + verifyNoMoreInteractions(statsD); + } + private static class Latched implements StatsDClient { private final StatsDClient delegate; private final CountDownLatch latch;