diff --git a/dd-smoke-tests/log-injection/src/test/groovy/datadog/smoketest/LogInjectionSmokeTest.groovy b/dd-smoke-tests/log-injection/src/test/groovy/datadog/smoketest/LogInjectionSmokeTest.groovy index 85e4168605a..1ca16850639 100644 --- a/dd-smoke-tests/log-injection/src/test/groovy/datadog/smoketest/LogInjectionSmokeTest.groovy +++ b/dd-smoke-tests/log-injection/src/test/groovy/datadog/smoketest/LogInjectionSmokeTest.groovy @@ -6,6 +6,9 @@ import datadog.environment.JavaVirtualMachine import datadog.trace.agent.test.server.http.TestHttpServer.HandlerApi.RequestApi import datadog.trace.api.config.GeneralConfig import datadog.trace.test.util.Flaky +import java.io.ByteArrayOutputStream +import java.io.InputStream +import java.io.RandomAccessFile import java.nio.charset.StandardCharsets import java.util.zip.GZIPInputStream import spock.lang.AutoCleanup @@ -53,6 +56,10 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest { @AutoCleanup MockBackend mockBackend = new MockBackend() + // Captured for inclusion in failure diagnostics so we don't have to dig through Gradle reports. + @Shared + String launchCommand + def setup() { mockBackend.reset() } @@ -114,7 +121,8 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest { command.addAll(additionalArguments()) command.addAll((String[]) ["-jar", loggingJar]) - println "COMMANDS: " + command.join(" ") + launchCommand = command.join(" ") + println "COMMANDS: " + launchCommand ProcessBuilder processBuilder = new ProcessBuilder(command) processBuilder.directory(new File(buildDirectory)) @@ -349,8 +357,12 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest { /** * Like {@link AbstractSmokeTest#waitForTraceCount} but checks process liveness on every poll - * iteration and dumps diagnostic state on failure, so CI failures produce actionable output - * instead of a bare "Condition not satisfied" after a 30s timeout. + * iteration and dumps comprehensive diagnostic state on failure. The previous iteration of + * this method narrowed the flake to "process alive, RC polling, captured stdout empty" but + * could not pinpoint where the JVM was wedged — see {@link #captureFullDiagnostic} for the + * extended state captured here, designed to discriminate between a class-load deadlock, + * stuck @Trace advice, broken trace writer, no-op tracer, dead OutputThreads writer, and a + * stalled stdout pipe in a single failure. */ int waitForTraceCountAlive(int count) { try { @@ -364,41 +376,247 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest { return } if (testedProcess != null && !testedProcess.isAlive()) { - def lastLines = tailProcessLog(20) // RuntimeException (not AssertionError) so PollingConditions propagates // immediately instead of retrying for the full timeout. throw new RuntimeException( - "Process exited with code ${testedProcess.exitValue()} while waiting for ${count} traces " + - "(received ${traceCount.get()}, RC polls: ${rcClientMessages.size()}).\n" + - "Last process output:\n${lastLines}") + "Process exited while waiting for ${count} traces.\n" + + captureFullDiagnostic(count)) } assert traceCount.get() >= count } } catch (AssertionError e) { - // The default error ("Condition not satisfied after 30s") is useless — enrich with diagnostic state - def alive = testedProcess?.isAlive() - def lastLines = tailProcessLog(30) + // The default error ("Condition not satisfied after 30s") is useless — enrich with diagnostic state. throw new AssertionError( - "Timed out waiting for ${count} traces after ${defaultPoll.timeout}s. " + - "traceCount=${traceCount.get()}, process.alive=${alive}, " + - "RC polls received: ${rcClientMessages.size()}.\n" + - "Last process output:\n${lastLines}", e) + "Timed out waiting for ${count} traces after ${defaultPoll.timeout}s.\n" + + captureFullDiagnostic(count), e) } traceCount.get() } - private String tailProcessLog(int lines) { + /** + * Comprehensive diagnostic capture for the failure path. Triggers a SIGQUIT thread dump on + * the live process, waits briefly for it to flow through the captured stdout pipe, then + * collects everything needed to discriminate the remaining hypotheses for this flake: + * captured stdout file size + tail (with thread dump if the OutputThreads writer is alive), + * the application logger file (logback/log4j2/JBoss target via {@code dd.test.logfile}), + * OutputThreads writer-thread health, and a {@code jcmd Thread.print} fallback that works + * even if the writer thread is dead. + */ + private String captureFullDiagnostic(int targetCount) { + boolean alive = testedProcess != null && testedProcess.isAlive() + long pid = -1L + if (testedProcess != null) { + // Process.pid() is Java 9+; the test runner JVM may be Java 8 (zulu8). Fall back gracefully. + try { + pid = testedProcess.pid() + } catch (Throwable ignored) { + // Java 8 — SIGQUIT/jcmd paths get skipped; we still capture file state + thread groups. + } + } + + // Trigger SIGQUIT for an in-process thread dump. The JVM writes the dump to its stderr, + // which redirectErrorStream(true) merges into the captured stdout pipe. Sleep briefly to + // let the writer thread drain it before we re-read the file. No-op (and harmless) on + // Windows; in CI this runs on Linux containers. + if (alive && pid > 0) { + try { + ["kill", "-3", "${pid}".toString()].execute().waitFor(2, SECONDS) + } catch (Throwable ignored) { + // best-effort + } + try { + Thread.sleep(1500) + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt() + } + } + + def sb = new StringBuilder() + sb << "pid=${pid} alive=${alive}" + if (testedProcess != null && !alive) { + try { + sb << " exitValue=${testedProcess.exitValue()}" + } catch (Throwable ignored) { + // exitValue may not be available + } + } + sb << "\n" + sb << "rcPolls=${rcClientMessages.size()} traceCount=${traceCount.get()} target=${targetCount}\n" + if (rcClientDecodingFailure != null) { + sb << "rcDecodingFailure=${rcClientDecodingFailure}\n" + } + if (traceDecodingFailure != null) { + sb << "traceDecodingFailure=${traceDecodingFailure}\n" + } + sb << "launchCommand=${launchCommand}\n" + sb << "outputThreads=${describeOutputThreadGroup()}\n" + + def stdoutFile = new File(logFilePath) + if (stdoutFile.exists()) { + sb << "capturedStdout=${stdoutFile.absolutePath} size=${stdoutFile.length()} mtime=${new Date(stdoutFile.lastModified())}\n" + sb << "--- captured stdout (last 60 lines, post-SIGQUIT) ---\n" + sb << tailFile(stdoutFile, 60) + sb << "\n" + } else { + sb << "capturedStdout=${logFilePath} (does not exist)\n" + } + + if (outputLogFile != null && outputLogFile.exists()) { + sb << "appLogFile=${outputLogFile.absolutePath} size=${outputLogFile.length()}\n" + sb << "--- app log (last 30 lines) ---\n" + sb << tailFile(outputLogFile, 30) + sb << "\n" + } else if (outputLogFile != null) { + sb << "appLogFile=${outputLogFile.absolutePath} (does not exist)\n" + } + + if (pid > 0) { + sb << "--- jcmd Thread.print (fallback, works even if OutputThreads writer is dead) ---\n" + sb << jcmdThreadPrint(pid) + sb << "\n" + } + return sb.toString() + } + + private String describeOutputThreadGroup() { + // OutputThreads creates threads in a group named "smoke-output" — see OutputThreads.java. + ThreadGroup root = Thread.currentThread().threadGroup + while (root.parent != null) { + root = root.parent + } + Thread[] threads = new Thread[root.activeCount() + 16] + int n = root.enumerate(threads, true) + def details = [] + for (int i = 0; i < n; i++) { + def t = threads[i] + if (t == null) { + continue + } + ThreadGroup tg = t.threadGroup + if (tg != null && tg.name == "smoke-output") { + details << "${t.name}/${t.state}/alive=${t.alive}" + } + } + return "smoke-output threads=${details.size()} [${details.join(", ")}]" + } + + // Cap on diagnostic chunks so a wedged JVM with a giant thread dump can't OOM the Gradle + // worker or produce an unreadable test report. ~32KB is enough for typical thread dumps. + private static final int DIAG_TAIL_BYTES = 32 * 1024 + + /** + * Returns the last {@code lines} lines (or up to {@link #DIAG_TAIL_BYTES} from the file end, + * whichever is smaller) without loading the whole file into memory. Important on the failure + * path because we have just appended a full JVM thread dump to the captured stdout file via + * SIGQUIT — {@code readLines()} on that file could OOM the Gradle worker on a wedged JVM with + * a large dump. + */ + private String tailFile(File f, int lines) { + try (RandomAccessFile raf = new RandomAccessFile(f, "r")) { + long len = raf.length() + long start = Math.max(0L, len - DIAG_TAIL_BYTES) + raf.seek(start) + byte[] buf = new byte[(int) (len - start)] + raf.readFully(buf) + String chunk = new String(buf, StandardCharsets.UTF_8) + // If we started mid-line, drop the partial first line so the tail reads cleanly. + if (start > 0) { + int nl = chunk.indexOf('\n') + if (nl >= 0) { + chunk = chunk.substring(nl + 1) + } + } + String[] all = chunk.split("\\R", -1) + int from = Math.max(0, all.length - lines) + def out = new StringBuilder() + if (start > 0) { + out << "...(truncated to last ${len - start} bytes)...\n" + } + for (int i = from; i < all.length; i++) { + if (i > from) { + out << "\n" + } + out << all[i] + } + return out.toString() + } catch (Throwable e) { + return "(failed to read ${f.name}: ${e.message})" + } + } + + private String jcmdThreadPrint(long pid) { + String jcmdPath = resolveJcmdPath() + if (jcmdPath == null) { + return "(jcmd not found on java.home; skipping)" + } try { - def logFile = new File(logFilePath) - if (!logFile.exists()) { - return "(log file does not exist: ${logFilePath})" + // Merge stderr into stdout and drain incrementally — for a wedged JVM the dump can be + // larger than the OS pipe buffer (~64KB on Linux), and waiting for exit before reading + // would deadlock both jcmd and us. See codex review. + ProcessBuilder pb = new ProcessBuilder(jcmdPath, Long.toString(pid), "Thread.print") + pb.redirectErrorStream(true) + Process proc = pb.start() + ByteArrayOutputStream baos = new ByteArrayOutputStream() + byte[] buf = new byte[8192] + long deadline = System.nanoTime() + SECONDS.toNanos(5) + InputStream is = proc.getInputStream() + while (true) { + if (is.available() > 0) { + int n = is.read(buf) + if (n < 0) { + break + } + // Bound the in-memory buffer so a runaway dump can't OOM us. + if (baos.size() < DIAG_TAIL_BYTES * 2) { + baos.write(buf, 0, n) + } + } else if (!proc.isAlive()) { + // Drain any remaining bytes after exit. + int n + while ((n = is.read(buf)) >= 0) { + if (baos.size() < DIAG_TAIL_BYTES * 2) { + baos.write(buf, 0, n) + } + } + break + } else if (System.nanoTime() > deadline) { + proc.destroyForcibly() + proc.waitFor(1, SECONDS) + if (baos.size() == 0) { + return "(jcmd timed out with no output)" + } + break + } else { + Thread.sleep(50) + } + } + String out = new String(baos.toByteArray(), StandardCharsets.UTF_8) + if (out.size() > DIAG_TAIL_BYTES) { + // Keep the head — application/agent threads tend to be earlier in the dump than + // generic VM/GC/JIT threads, and the head is what reveals where main is wedged. + return out.substring(0, DIAG_TAIL_BYTES) + "\n...(truncated; full dump was ${out.size()} bytes)..." + } + return out + } catch (Throwable e) { + return "(jcmd unavailable: ${e.message})" + } + } + + private String resolveJcmdPath() { + // On Java 8, java.home points to the JRE subdirectory, so jcmd is at ../bin/jcmd; on + // Java 9+ it's at bin/jcmd. Try both, fall back to PATH. + String javaHome = System.getProperty("java.home") + if (javaHome != null) { + File direct = new File(javaHome, "bin/jcmd") + if (direct.canExecute()) { + return direct.absolutePath + } + File jdkSibling = new File(new File(javaHome).parentFile, "bin/jcmd") + if (jdkSibling.canExecute()) { + return jdkSibling.absolutePath } - def allLines = logFile.readLines() - def tail = allLines.size() > lines ? allLines[-lines..-1] : allLines - return tail.join("\n") - } catch (Exception e) { - return "(failed to read log: ${e.message})" } + return "jcmd" } def parseTraceFromStdOut( String line ) {