Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.trace.common.metrics.SignalItem.StopSignal;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.util.LRUCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
Expand Down Expand Up @@ -46,7 +47,8 @@ final class Aggregator implements Runnable {
final Set<MetricKey> commonKeys,
int maxAggregates,
long reportingInterval,
TimeUnit reportingIntervalTimeUnit) {
TimeUnit reportingIntervalTimeUnit,
HealthMetrics healthMetrics) {
this(
writer,
batchPool,
Expand All @@ -56,7 +58,8 @@ final class Aggregator implements Runnable {
maxAggregates,
reportingInterval,
reportingIntervalTimeUnit,
DEFAULT_SLEEP_MILLIS);
DEFAULT_SLEEP_MILLIS,
healthMetrics);
}

Aggregator(
Expand All @@ -68,14 +71,18 @@ final class Aggregator implements Runnable {
int maxAggregates,
long reportingInterval,
TimeUnit reportingIntervalTimeUnit,
long sleepMillis) {
long sleepMillis,
HealthMetrics healthMetrics) {
this.writer = writer;
this.batchPool = batchPool;
this.inbox = inbox;
this.commonKeys = commonKeys;
this.aggregates =
new LRUCache<>(
new CommonKeyCleaner(commonKeys), maxAggregates * 4 / 3, 0.75f, maxAggregates);
new CommonKeyCleaner(commonKeys, healthMetrics),
maxAggregates * 4 / 3,
0.75f,
maxAggregates);
this.pending = pending;
this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval);
this.sleepMillis = sleepMillis;
Expand Down Expand Up @@ -183,14 +190,19 @@ private static final class CommonKeyCleaner
implements LRUCache.ExpiryListener<MetricKey, AggregateMetric> {

private final Set<MetricKey> commonKeys;
private final HealthMetrics healthMetrics;

private CommonKeyCleaner(Set<MetricKey> commonKeys) {
private CommonKeyCleaner(Set<MetricKey> commonKeys, HealthMetrics healthMetrics) {
this.commonKeys = commonKeys;
this.healthMetrics = healthMetrics;
}

@Override
public void accept(Map.Entry<MetricKey, AggregateMetric> expired) {
commonKeys.remove(expired.getKey());
if (expired.getValue().getHitCount() > 0) {
healthMetrics.onStatsAggregateDropped();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ public ConflatingMetricsAggregator(
keys.keySet(),
maxAggregates,
reportingInterval,
timeUnit);
timeUnit,
healthMetric);
this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator);
this.reportingInterval = reportingInterval;
this.reportingIntervalTimeUnit = timeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public void onClientStatErrorReceived() {}

public void onClientStatDowngraded() {}

public void onStatsAggregateDropped() {}

/**
* @return Human-readable summary of the current health metrics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
private final LongAdder clientStatsErrors = new LongAdder();
private final LongAdder clientStatsDowngrades = new LongAdder();

private final LongAdder statsAggregateDropped = new LongAdder();

private final StatsDClient statsd;
private final long interval;
private final TimeUnit units;
Expand Down Expand Up @@ -350,6 +352,11 @@ public void onClientStatErrorReceived() {
clientStatsErrors.increment();
}

@Override
public void onStatsAggregateDropped() {
statsAggregateDropped.increment();
}

@Override
public void close() {
if (null != cancellation) {
Expand All @@ -366,8 +373,9 @@ private static class Flush implements AgentTaskScheduler.Task<TracerHealthMetric
private static final String[] SERIAL_FAILED_TAG = new String[] {"failure:serial"};
private static final String[] UNSET_TAG = new String[] {"priority:unset"};
private static final String[] SINGLE_SPAN_SAMPLER = new String[] {"sampler:single-span"};
private static final String[] REASON_LRU_EVICTION_TAG = new String[] {"reason:lru_eviction"};

private final long[] previousCounts = new long[50];
private final long[] previousCounts = new long[51];

@SuppressFBWarnings("AT_STALE_THREAD_WRITE_OF_PRIMITIVE")
private int countIndex;
Expand Down Expand Up @@ -491,6 +499,11 @@ public void run(TracerHealthMetrics target) {
reportIfChanged(target.statsd, "stats.flush_errors", target.clientStatsErrors, NO_TAGS);
reportIfChanged(
target.statsd, "stats.agent_downgrades", target.clientStatsDowngrades, NO_TAGS);
reportIfChanged(
target.statsd,
"stats.dropped_aggregates",
target.statsAggregateDropped,
REASON_LRU_EVICTION_TAG);

} catch (ArrayIndexOutOfBoundsException e) {
log.warn(
Expand Down Expand Up @@ -622,6 +635,8 @@ public String summary() {
+ "\nclientStatsProcessedSpans="
+ clientStatsProcessedSpans.sum()
+ "\nclientStatsProcessedTraces="
+ clientStatsProcessedTraces.sum();
+ clientStatsProcessedTraces.sum()
+ "\nstatsAggregateDropped="
+ statsAggregateDropped.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,86 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}

def "should report dropped aggregate to health metrics on LRU eviction"() {
setup:
int maxAggregates = 10
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()

when:
CountDownLatch latch = new CountDownLatch(1)
for (int i = 0; i < maxAggregates + 1; ++i) {
aggregator.publish([
new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK)
.setTag(SPAN_KIND, "baz")
])
}
aggregator.report()
def latchTriggered = latch.await(2, SECONDS)

then:
latchTriggered
1 * writer.finishBucket() >> { latch.countDown() }
1 * healthMetrics.onStatsAggregateDropped()

cleanup:
aggregator.close()
}

def "should not report dropped aggregate when evicted entry was already flushed"() {
setup:
int maxAggregates = 5
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
aggregator.start()

when: "fill cache and flush — entries are cleared (hitCount=0) but stay in the LRU"
CountDownLatch latch1 = new CountDownLatch(1)
for (int i = 0; i < maxAggregates; ++i) {
aggregator.publish([
new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK)
.setTag(SPAN_KIND, "baz")
])
}
aggregator.report()
latch1.await(2, SECONDS)

then:
1 * writer.finishBucket() >> { latch1.countDown() }

when: "publish new distinct spans — LRU evicts the cleared entries before the next report"
CountDownLatch latch2 = new CountDownLatch(1)
for (int i = maxAggregates; i < maxAggregates * 2; ++i) {
aggregator.publish([
new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK)
.setTag(SPAN_KIND, "baz")
])
}
aggregator.report()
latch2.await(2, SECONDS)

then: "no drop metric because all evicted entries had hitCount=0 (already reported)"
1 * writer.finishBucket() >> { latch2.countDown() }
0 * healthMetrics.onStatsAggregateDropped()

cleanup:
aggregator.close()
}

def "aggregate not updated in reporting interval not reported"() {
setup:
int maxAggregates = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,19 @@ void testOnLongRunningUpdate() throws InterruptedException {
verifyNoMoreInteractions(statsD);
}

@Test
void testOnStatsAggregateDropped() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
try (TracerHealthMetrics healthMetrics =
new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS)) {
healthMetrics.start();
healthMetrics.onStatsAggregateDropped();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
verify(statsD).count("stats.dropped_aggregates", 1L, "reason:lru_eviction");
verifyNoMoreInteractions(statsD);
}

private static class Latched implements StatsDClient {
private final StatsDClient delegate;
private final CountDownLatch latch;
Expand Down
Loading