Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,21 @@ public static void runTests(PerfTestBase<?>[] tests, boolean sync, boolean compl
.block();
}
} catch (Exception e) {
// Previously this catch swallowed the exception, printed a stack trace, and let the
// method fall through to "=== Results ===" / globalCleanupAsync, which caused the
// process to exit 0. That made real test failures (e.g. an SDK NullPointerException
// surfaced by a retry path during a stress run) invisible to CI and dashboards: the
// Kubernetes Job reported Succeeded and the workbook showed a healthy success ratio
// even though the test loop had aborted partway through its configured duration.
//
// Rethrow as an unchecked exception so the surrounding lifecycle still runs the
// `finally` block below (progressStatus.cancel()) and the outer `try { ... } finally`
// in run(...) still drives globalCleanupAsync, but the JVM ultimately exits non-zero
// and the failure is visible. See sdk/storage/BUG-payload-size-gate-npe.md for the
// specific case that motivated this change.
System.err.println("Error occurred running tests: " + System.lineSeparator() + e);
e.printStackTrace(System.err);
throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
} finally {
progressStatus.cancel();
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/storage/azure-storage-blob-stress/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
<doclintMissingInclusion>-</doclintMissingInclusion>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.58.0</version> <!-- {x-version-update;io.opentelemetry:opentelemetry-bom;external_dependency} -->
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
set -ex;
dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx;
/root/.dotnet/tools/http-fault-injector;
dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password;
/root/.dotnet/tools/http-fault-injector;
12 changes: 11 additions & 1 deletion sdk/storage/azure-storage-blob-stress/scripts/stress-run.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
#!/bin/sh
set -ex;
set -exa;
keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit;
attempts=0;
while [ ! -s /mnt/outputs/dev-cert.crt ]; do
attempts=$((attempts + 1));
if [ "$attempts" -gt 60 ]; then
echo "Timed out waiting for fault injector certificate" >&2;
exit 1;
fi;
sleep 1;
done;
keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true;
keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit;
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.stress.ContentMismatchException;
import com.azure.storage.stress.TelemetryHelper;
import com.azure.storage.stress.FaultInjectionProbabilities;
import com.azure.storage.stress.FaultInjectingHttpPolicy;
import com.azure.storage.stress.StorageStressOptions;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.time.Instant;
Expand Down Expand Up @@ -156,20 +154,89 @@ public void run() {
@SuppressWarnings("try")
@Override
public Mono<Void> runAsync() {
// We previously wrapped runInternalAsync with an unconditional `.retryWhen(Retry.max(3))`.
// That mask hid a real liveness bug in the SDK upload pipeline: when an HTTP fault
// (especially the request-side `pq*` indefinite variants from FaultInjectingHttpPolicy)
// causes one parallel rail's Mono to stop making progress, the unconditional retry burns
// 3 attempts of 60s Netty response timeouts on top of an already-stuck pipeline and the
// hang propagates outward. With three parallel rails, a single such stall freezes the
// whole `flatMap(runTestAsync, 1)` for the remainder of the configured --duration, which
// is exactly what the six "Failed" large-payload pods exhibited on 2026-05-26 (see
// sdk/storage/BUG-blob-upload-hang-on-fault.md).
//
// Replace the retry with an outer per-operation timeout. Any single iteration that
// doesn't complete within `OPERATION_TIMEOUT` fails fast as a TimeoutException, which:
// 1. lets the rail recycle and start the next iteration,
// 2. surfaces a real failure on the dashboard instead of a frozen progress counter,
// 3. produces evidence (the TimeoutException) for SDK bug triage, and
// 4. still tolerates the natural per-op latency under fault injection (~6.3% of HTTP
// calls are configured-indefinite; effective avg op latency is a few seconds for
// small payloads and tens of seconds for large multi-request operations, well under
// the 2-minute cap below).
// If a scenario legitimately needs longer than the default per op, override
// `getOperationTimeout()` in the scenario class rather than blanket-raising it here.
//
// Tuning note (2026-05-26): the previous default was 2 minutes, which was too coarse.
// Under fault injection the dominant wedge is the `pq*` (request-side, indefinite)
// variant. Netty's `responseTimeout` already fires after 60s, so any operation still
// unresponsive ~30s after that is a real liveness bug and we want to fail fast and
// recycle the rail. Empirically this 4-6x's small-scenario throughput (small-blob ops
// typically complete in <2s; even fault-injected ops are bounded by the 60s Netty
// response timeout) while still leaving plenty of headroom for multi-block large
// operations via per-scenario overrides.
//
// CRITICAL: the outer `.onErrorResume(e -> Mono.empty())` converts a failed iteration
// into a successful "this iteration is done" signal *after* `instrumentRunAsync` has
// already fired its `doOnError` side-effect (which calls `trackFailure` and increments
// the `failed_runs` metric). Without this, a TimeoutException -- or any other error --
// propagates out of `runTestAsync()` into `flatMap(runTestAsync, 1)` in
// ApiPerfTestBase.runAllAsync and terminates the entire Flux, cancelling all parallel
// rails and aborting the test loop. The previous `.retryWhen(Retry.max(3))` happened to
// mask this because most ops succeeded within retries, so the propagation path was
// rarely exercised; with the retry gone, every long-tail op would otherwise kill the
// whole job after the first 2-minute timeout. The error has already been logged and
// counted as a failure by the time `onErrorResume` runs, so no information is lost.
return telemetryHelper.instrumentRunAsync(ctx ->
runInternalAsync(ctx)
.retryWhen(reactor.util.retry.Retry.max(3)
.filter(e -> !(Exceptions.unwrap(e)
instanceof ContentMismatchException)))
.timeout(getOperationTimeout())
.doOnError(e -> {
// Log the error for debugging but let legitimate failures propagate
LOGGER.atError()
.addKeyValue("error", e.getMessage())
.addKeyValue("errorType", e.getClass().getSimpleName())
.log("Test operation failed after retries");
}));
.log("Test operation failed");
}))
.onErrorResume(e -> Mono.empty());
}

/**
* Default per-operation timeout for stress scenarios. Small-payload scenarios complete
* well under 2s in steady state and are bounded by Netty's 60s `responseTimeout` under
* fault injection, so 30s catches genuine liveness wedges without throwing away healthy
* long-tail ops. Multi-block large-payload variants need more time (a single op can be
* many sequential block uploads), so we scale the default up based on `options.getSize()`:
* one extra minute per 16 MiB above 1 MiB, capped at 5 minutes. Scenarios that need an
* even larger envelope can override this method:
*
* <pre>{@code
* @Override
* protected Duration getOperationTimeout() { return Duration.ofMinutes(10); }
* }</pre>
*/
protected Duration getOperationTimeout() {
long sizeBytes = options.getSize();
// Baseline 30s, +60s per 16 MiB beyond the first 1 MiB. Capped at 5 minutes so a
// genuinely-wedged rail still recovers promptly even for the largest configured payloads.
long extraSeconds = Math.max(0, (sizeBytes - SMALL_PAYLOAD_THRESHOLD_BYTES) / BYTES_PER_EXTRA_MINUTE) * 60;
long totalSeconds = Math.min(BASE_TIMEOUT_SECONDS + extraSeconds, MAX_TIMEOUT_SECONDS);
return Duration.ofSeconds(totalSeconds);
}

private static final long SMALL_PAYLOAD_THRESHOLD_BYTES = (long) 1024 * 1024; // 1 MiB
private static final long BYTES_PER_EXTRA_MINUTE = 16L * 1024 * 1024; // 16 MiB
private static final long BASE_TIMEOUT_SECONDS = 30;
private static final long MAX_TIMEOUT_SECONDS = 5 * 60;

protected abstract void runInternal(Context context) throws Exception;
protected abstract Mono<Void> runInternalAsync(Context context);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
param baseName string
param endpointSuffix string = 'core.windows.net'
param endpointSuffix string = environment().suffixes.storage
param location string = resourceGroup().location
param storageApiVersion string = '2022-09-01'

var primaryAccountName = '${baseName}'
var pageBlobStorageAccountName = '${baseName}pageblob'
var uniqueSuffix = uniqueString(resourceGroup().id)
var primaryAccountName = '${take(baseName, 11)}${uniqueSuffix}'
var pageBlobStorageAccountName = '${take(baseName, 7)}${uniqueSuffix}page'

resource primaryAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
name: primaryAccountName
Expand All @@ -26,5 +26,5 @@ resource pageBlobStorageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' =
properties: {}
}

output STORAGE_ENDPOINT_STRING string = '"https://${primaryAccountName}.blob.core.windows.net"'
output PAGE_BLOB_STORAGE_ENDPOINT_STRING string = '"https://${pageBlobStorageAccountName}.blob.core.windows.net"'
output STORAGE_ENDPOINT_STRING string = '"https://${primaryAccountName}.blob.${endpointSuffix}"'
output PAGE_BLOB_STORAGE_ENDPOINT_STRING string = '"https://${pageBlobStorageAccountName}.blob.${endpointSuffix}"'
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spec:
args:
- |
set -ex;
dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx;
dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password;
/root/.dotnet/tools/http-fault-injector;
resources:
limits:
Expand All @@ -30,7 +30,17 @@ spec:
- |
set -xa;
set -o pipefail;
keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit;
attempts=0;
while [ ! -s /mnt/outputs/dev-cert.crt ]; do
attempts=$((attempts + 1));
if [ "$attempts" -gt 60 ]; then
echo "Timed out waiting for fault injector certificate" >&2;
exit 1;
fi;
sleep 1;
done;
keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true;
keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit || exit 1;
mkdir -p "$DEBUG_SHARE";
. /mnt/outputs/.env;
export AZURE_HTTP_CLIENT_IMPLEMENTATION=com.azure.core.http.netty.NettyAsyncHttpClientProvider;
Expand Down Expand Up @@ -67,4 +77,4 @@ spec:
cpu: "0.7"
{{- include "stress-test-addons.container-env" . | nindent 6 }}

{{- end -}}
{{- end -}}
12 changes: 12 additions & 0 deletions sdk/storage/azure-storage-stress/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
<doclintMissingInclusion>-</doclintMissingInclusion>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.58.0</version> <!-- {x-version-update;io.opentelemetry:opentelemetry-bom;external_dependency} -->
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.azure.perf.test.core.RepeatingInputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -20,7 +19,6 @@

public class CrcInputStream extends InputStream {
private final static ClientLogger LOGGER = new ClientLogger(CrcInputStream.class);
private final Sinks.One<ContentInfo> sink = Sinks.one();
private final InputStream inputStream;
private final CRC32 crc = new CRC32();
private final ByteBuffer head = ByteBuffer.allocate(1024);
Expand All @@ -44,7 +42,6 @@ public CrcInputStream(InputStream source) {
public synchronized int read() throws IOException {
int b = inputStream.read();
if (b < 0) {
emitContentInfo();
return b;
}

Expand All @@ -60,7 +57,6 @@ public synchronized int read() throws IOException {
public synchronized int read(byte buf[], int off, int len) throws IOException {
int read = inputStream.read(buf, off, len);
if (read < 0) {
emitContentInfo();
return read;
}

Expand All @@ -72,33 +68,6 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
return read;
}

// Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions
// (SDK retries, verification passes) don't throw on the second EOF.
private void emitContentInfo() {
String baseErrorMessage = "Failed to emit content because ";
Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head));
switch (emitResult) {
case OK:
case FAIL_TERMINATED:
// No action needed for successful or already-terminated emissions.
break;
case FAIL_CANCELLED:
throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage +
" the sink was previously interrupted by its consumer: " + emitResult));
case FAIL_OVERFLOW:
throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult));
case FAIL_NON_SERIALIZED:
throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " +
"once: " + emitResult));
case FAIL_ZERO_SUBSCRIBER:
throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " +
"subscriber:" + emitResult));
default:
throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "unexpected emit result: "
+ emitResult));
}
}

@Override
public synchronized void mark(int readLimit) {
if (markSupported) {
Expand All @@ -124,8 +93,32 @@ public boolean markSupported() {
return markSupported;
}

/**
* Returns a {@link Mono} that, on subscription, captures a snapshot of the stream's
* current CRC, byte count and head buffer.
*
* <p>The returned Mono is intentionally lazy: it does <strong>not</strong> wait for EOF or
* for any sink to be signaled. Callers are therefore responsible for subscribing only
* <em>after</em> the stream has been fully consumed (for example, after the SDK upload
* call has returned for synchronous flows, or via {@code .then(data.getContentInfo())}
* for reactive flows). Subscribing before the stream is done will produce a snapshot of
* whatever has been read so far.</p>
*
* <p>This contract avoids the previous design's dependence on the SDK reading past EOF
* (which never happened on known-length uploads and could leave the legacy sink waiting
* indefinitely) and naturally tolerates SDK retries: the snapshot reflects the bytes
* that were actually delivered on the final, successful pass.</p>
*
* @return a cold Mono that emits a {@link ContentInfo} snapshot on each subscription.
*/
public Mono<ContentInfo> getContentInfo() {
return sink.asMono();
return Mono.fromCallable(() -> {
synchronized (this) {
// duplicate() shares the underlying byte[] but gives the caller an independent
// position/limit so subsequent reads on this stream don't perturb the snapshot.
return new ContentInfo(crc.getValue(), length, head.duplicate());
Comment on lines +117 to +119
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public String getDescription() {
Cpu.registerObservers(otel);
MemoryPools.registerObservers(otel);
Threads.registerObservers(otel);
GarbageCollector.registerObservers(otel);
GarbageCollector.registerObservers(otel, true);
OpenTelemetryAppender.install(otel);
return otel;
}
Expand Down