Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
d2d23a8
feat(cosmos): add RequestedRegion and RequestedRegionReason public types
NaluTripician May 14, 2026
5b216f3
feat(cosmos): add hedging-detection state with shared regionLock
NaluTripician May 14, 2026
26f293c
feat(cosmos): expose isHedgingStarted/getRequestedRegions/getResponde…
NaluTripician May 14, 2026
6217e06
feat(cosmos): wire orchestrator and retry sites for hedging detection
NaluTripician May 14, 2026
1b11790
test(cosmos): add hedging-detection AC matrix tests
NaluTripician May 14, 2026
de298c3
docs(cosmos): CHANGELOG entry for hedging detection API
NaluTripician May 14, 2026
880748d
ci: retrigger build to clear flaky Spark bridge + Service Bus emulato…
NaluTripician May 14, 2026
8b8c762
test(cosmos): fix racy reader in compoundAtomicityIsPreservedUnderCon…
NaluTripician May 14, 2026
9137872
ci: retrigger to clear flaky SparkE2EBulkWriteITest restart test
NaluTripician May 15, 2026
493a071
ci: iteration 4 retrigger - Spark integration flake rotates per run
NaluTripician May 15, 2026
99cfbd3
Merge branch 'main' into feature/cosmos-hedging-detection-api
NaluTripician May 15, 2026
3893be9
Merge branch 'main' into feature/cosmos-hedging-detection-api
NaluTripician May 18, 2026
501274c
fix(cosmos): address hedging detection PR review feedback
NaluTripician May 18, 2026
6294d9e
Merge branch 'main' into feature/cosmos-hedging-detection-api
NaluTripician May 18, 2026
7bf4eb7
ci: retrigger to clear cosmos-ci Scala xsbt bridge + libzip SIGBUS fl…
NaluTripician May 19, 2026
579e1bd
ci: retrigger to clear cosmos-ci timing+Spark encoding flakes
NaluTripician May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.azure.cosmos.models.CosmosReadManyRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.RequestedRegion;
import com.azure.cosmos.models.RequestedRegionReason;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.test.faultinjection.FaultInjectionCondition;
import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder;
Expand Down Expand Up @@ -141,6 +143,76 @@ public void testThresholdAvailabilityStrategy(
}
}

/**
* Live multi-region smoke test for the hedging-detection API (issue
* Azure/azure-sdk-for-java#49182, AC14). Runs only against the team's multi-region test
* account. Asserts the public {@code isHedgingStarted} / {@code getRequestedRegions} /
* {@code getRespondedRegions} surface produces the expected end-to-end signal when the
* primary region is slow and a hedge to the secondary wins. End-to-end fidelity check;
* not a substitute for the AC-matrix coverage under
* {@code com.azure.cosmos.diagnostics.HedgingDetectionUnitTests}.
*/
@Test(groups = {"multi-master"}, timeOut = TIMEOUT * 10)
public void hedgingDetectionApi_primarySlow_returnsCrossRegionDiagnostics_liveAccount() {
if (this.preferredRegionList.size() <= 1) {
throw new SkipException("hedgingDetectionApi smoke test requires >= 2 preferred regions");
}

ConnectionMode connectionMode = getClientBuilder().buildConnectionPolicy().getConnectionMode();
FaultInjectionConnectionType faultInjectionConnectionType = connectionMode == ConnectionMode.DIRECT ?
FaultInjectionConnectionType.DIRECT :
FaultInjectionConnectionType.GATEWAY;

TestObject createdItem = TestObject.create();
this.cosmosAsyncContainer.createItem(createdItem).block();

FaultInjectionRule rule = injectFailure(this.cosmosAsyncContainer, faultInjectionConnectionType);
try {
// Let cross-region replication catch up so the hedge arm in region[1] can succeed.
Thread.sleep(2000);

CosmosItemRequestOptions options = new CosmosItemRequestOptions();
CosmosDiagnostics cosmosDiagnostics = performDocumentOperation(
this.cosmosAsyncContainer, OperationType.Read, createdItem, options, false, null);

assertThat(cosmosDiagnostics).isNotNull();

// AC14(b): hedging actually fired.
assertThat(cosmosDiagnostics.isHedgingStarted())
.as("isHedgingStarted() should be true when the primary is slow and a hedge wins")
.isTrue();

// AC14(a): both regions appear, secondary tagged HEDGING.
List<RequestedRegion> regions = cosmosDiagnostics.getRequestedRegions();
assertThat(regions)
.as("getRequestedRegions() should contain INITIAL on primary and HEDGING on secondary")
.anyMatch(r -> r.getReason() == RequestedRegionReason.INITIAL
&& r.getRegionName().equalsIgnoreCase(this.regions.get(0)))
.anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING
&& r.getRegionName().equalsIgnoreCase(this.regions.get(1)));

// AC14(c): secondary actually responded.
assertThat(cosmosDiagnostics.getRespondedRegions())
.as("getRespondedRegions() should include the hedge winner")
.anyMatch(name -> name.equalsIgnoreCase(this.regions.get(1)));

CosmosDiagnosticsContext ctx = cosmosDiagnostics.getDiagnosticsContext();
if (ctx != null) {
// Context-level mirror should agree with the per-operation diagnostics.
assertThat(ctx.isHedgingStarted()).isTrue();
assertThat(ctx.getRequestedRegions())
.anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING);
assertThat(ctx.getRespondedRegions())
.anyMatch(name -> name.equalsIgnoreCase(this.regions.get(1)));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
fail("hedging-detection smoke test interrupted", ex);
} finally {
rule.disable();
}
}

@Test(groups = {"multi-region"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT*100)
public void testThresholdAvailabilityStrategyForReadsDefaultEnablementWithPpaf(
OperationType operationType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.diagnostics;

import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.TestUtils;
import com.azure.cosmos.models.RequestedRegion;
import com.azure.cosmos.models.RequestedRegionReason;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Unit-level tests for the hedging-detection state on {@link CosmosDiagnostics}.
*
* <p>Covers AC1, AC2, AC4, AC5, AC8 (lock-guarded compound atomicity), AC10 (unmodifiable
* snapshot) and the spec invariants in {@code public-spec-java.md} §M5 / §M6 / §M8 (compound
* atomicity of HEDGING dispatch + {@code hedgingStarted = true} flip).
*
* <p>Behavioural ACs that require the orchestrator wiring (AC3, AC6, AC7, AC9, AC11, AC14) are
* exercised by end-to-end / fault-injection tests against the live multi-region service; this
* class only covers the parts that are observable without a network/orchestrator.
*/
public class HedgingDetectionUnitTests {

private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor DIAG_ACCESSOR =
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();

private static CosmosDiagnostics newDiagnostics() {
DiagnosticsClientContext ctx = TestUtils.mockDiagnosticsClientContext();
return ctx.createDiagnostics();
}

@Test(groups = {"unit"})
public void newDiagnosticsHasEmptyHedgingState() {
CosmosDiagnostics diagnostics = newDiagnostics();

assertThat(diagnostics.isHedgingStarted()).isFalse();
assertThat(diagnostics.getRequestedRegions()).isEmpty();
assertThat(diagnostics.getRespondedRegions()).isEmpty();
}

@Test(groups = {"unit"})
public void appendingHedgingFlipsHedgingStartedAtomically() {
CosmosDiagnostics diagnostics = newDiagnostics();

DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL));
assertThat(diagnostics.isHedgingStarted()).isFalse();

DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING));

// Both writes occur under regionLock in a single appendRequestedRegionInternal call,
// so any reader (also under the lock) sees the list update AND the flag flip together.
assertThat(diagnostics.isHedgingStarted()).isTrue();
assertThat(diagnostics.getRequestedRegions())
.extracting(RequestedRegion::getReason)
.contains(RequestedRegionReason.HEDGING);
}

@Test(groups = {"unit"})
public void nonHedgingReasonsDoNotFlipHedgingStarted() {
CosmosDiagnostics diagnostics = newDiagnostics();

DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.OPERATION_RETRY));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.REGION_FAILOVER));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.TRANSPORT_RETRY));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.CIRCUIT_BREAKER_PROBE));

assertThat(diagnostics.isHedgingStarted()).isFalse();
assertThat(diagnostics.getRequestedRegions()).hasSize(5);
}

@Test(groups = {"unit"})
public void requestedRegionsPreserveFifoOrder() {
CosmosDiagnostics diagnostics = newDiagnostics();

DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("North Europe", RequestedRegionReason.HEDGING));
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("North Europe", RequestedRegionReason.OPERATION_RETRY));

List<RequestedRegion> regions = diagnostics.getRequestedRegions();
assertThat(regions).containsExactly(
new RequestedRegion("East US", RequestedRegionReason.INITIAL),
new RequestedRegion("West US", RequestedRegionReason.HEDGING),
new RequestedRegion("North Europe", RequestedRegionReason.HEDGING),
new RequestedRegion("North Europe", RequestedRegionReason.OPERATION_RETRY));
}

@Test(groups = {"unit"})
public void respondedRegionsAllowDuplicatesInArrivalOrder() {
// Gate Q9=A: duplicates allowed in getRespondedRegions. A region that produces multiple
// responses (late hedge response, retry) must appear once per response.
CosmosDiagnostics diagnostics = newDiagnostics();

DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US");
DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "West US");
DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US");

assertThat(diagnostics.getRespondedRegions()).containsExactly("East US", "West US", "East US");
}

@Test(groups = {"unit"})
public void getRequestedRegionsReturnsUnmodifiableSnapshot() {
CosmosDiagnostics diagnostics = newDiagnostics();
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL));

List<RequestedRegion> snapshot = diagnostics.getRequestedRegions();

assertThatThrownBy(() -> snapshot.add(new RequestedRegion("West US", RequestedRegionReason.HEDGING)))
.isInstanceOf(UnsupportedOperationException.class);
}

@Test(groups = {"unit"})
public void getRespondedRegionsReturnsUnmodifiableSnapshot() {
CosmosDiagnostics diagnostics = newDiagnostics();
DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US");

List<String> snapshot = diagnostics.getRespondedRegions();

assertThatThrownBy(() -> snapshot.add("West US"))
.isInstanceOf(UnsupportedOperationException.class);
}

@Test(groups = {"unit"})
public void snapshotIsDecoupledFromLaterMutations() {
// The list returned at time T must NOT reflect appends after T (defensive snapshot).
CosmosDiagnostics diagnostics = newDiagnostics();
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL));

List<RequestedRegion> snapshotBefore = diagnostics.getRequestedRegions();
DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING));
List<RequestedRegion> snapshotAfter = diagnostics.getRequestedRegions();

assertThat(snapshotBefore).hasSize(1);
assertThat(snapshotAfter).hasSize(2);
}

@Test(groups = {"unit"}, timeOut = 30_000L)
public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws InterruptedException {
// 16 writer threads, each appending 1000 HEDGING entries to the same diagnostics.
//
// Writers hold `regionLock` across (list.add + maybe-flip hedgingStarted), so any reader
// that observes hedgingStarted == true and then reads getRequestedRegions() MUST see at
// least one HEDGING entry: hedgingStarted is monotonic (false -> true, never back), so
// the lock-release that flipped the flag happens-before the next snapshot acquisition.
//
// The reverse direction (flag == false, hasHedge == true) is a BENIGN read-skew: the
// reader's two accessor calls release the lock between them, so a writer can commit
// (append HEDGING entry + flip flag) in that window. A retry of the flag read would
// observe true. This is not a violation of M5/M6/M8 compound atomicity, which is a
// write-side invariant.
CosmosDiagnostics diagnostics = newDiagnostics();

final int writers = 16;
final int perWriter = 1000;
final CountDownLatch latch = new CountDownLatch(writers);
final AtomicInteger violations = new AtomicInteger();

Thread reader = new Thread(() -> {
while (latch.getCount() > 0) {
boolean flag = diagnostics.isHedgingStarted();
List<RequestedRegion> regions = diagnostics.getRequestedRegions();
boolean hasHedge = regions.stream().anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING);
// Only count the provably-impossible direction: flag observed true implies a
// writer's compound-atomic update committed before this read; therefore the
// later regions snapshot must include a HEDGING entry.
if (flag && !hasHedge) {
violations.incrementAndGet();
}
}
}, "hedging-detection-reader");
reader.setDaemon(true);
reader.start();

for (int w = 0; w < writers; w++) {
final int writerId = w;
new Thread(() -> {
try {
for (int i = 0; i < perWriter; i++) {
DIAG_ACCESSOR.appendRequestedRegion(diagnostics,
new RequestedRegion("Region-" + writerId, RequestedRegionReason.HEDGING));
}
} finally {
latch.countDown();
}
}, "hedging-detection-writer-" + w).start();
}

latch.await();
reader.join(5_000L);

assertThat(violations.get())
.as("isHedgingStarted() == true must imply at least one HEDGING entry in "
+ "getRequestedRegions() (monotonic flag + compound-atomic write; M5/M6/M8)")
.isZero();
assertThat(diagnostics.isHedgingStarted()).isTrue();
assertThat(diagnostics.getRequestedRegions()).hasSize(writers * perWriter);
}

@Test(groups = {"unit"})
public void getMostRecentlyContactedRegionReturnsNullWhenNoneRecorded() {
CosmosDiagnostics diagnostics = newDiagnostics();

assertThat(DIAG_ACCESSOR.getMostRecentlyContactedRegion(diagnostics)).isNull();
}

@Test(groups = {"unit"})
public void getMostRecentlyContactedRegionNullDiagnosticsIsNullSafe() {
assertThat(DIAG_ACCESSOR.getMostRecentlyContactedRegion(null)).isNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.models;

import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Unit tests for {@link RequestedRegion} value-type semantics (AC1 / spec section 3.1).
*/
public class RequestedRegionTest {

@Test(groups = {"unit"})
public void gettersReturnConstructorArguments() {
RequestedRegion region = new RequestedRegion("East US", RequestedRegionReason.INITIAL);

assertThat(region.getRegionName()).isEqualTo("East US");
assertThat(region.getReason()).isEqualTo(RequestedRegionReason.INITIAL);
}

@Test(groups = {"unit"})
public void equalsAndHashCodeAreCaseInsensitiveOnRegionName() {
RequestedRegion a = new RequestedRegion("East US", RequestedRegionReason.HEDGING);
RequestedRegion b = new RequestedRegion("east us", RequestedRegionReason.HEDGING);
RequestedRegion c = new RequestedRegion("EAST US", RequestedRegionReason.HEDGING);

assertThat(a).isEqualTo(b);
assertThat(a).isEqualTo(c);
assertThat(a.hashCode()).isEqualTo(b.hashCode());
assertThat(a.hashCode()).isEqualTo(c.hashCode());
}

@Test(groups = {"unit"})
public void equalsDistinguishesByReason() {
RequestedRegion initial = new RequestedRegion("East US", RequestedRegionReason.INITIAL);
RequestedRegion hedge = new RequestedRegion("East US", RequestedRegionReason.HEDGING);

assertThat(initial).isNotEqualTo(hedge);
}

@Test(groups = {"unit"})
public void equalsHandlesNullAndOtherTypes() {
RequestedRegion region = new RequestedRegion("East US", RequestedRegionReason.INITIAL);

assertThat(region.equals(null)).isFalse();
assertThat(region.equals("East US")).isFalse();
assertThat(region).isEqualTo(region);
}

@Test(groups = {"unit"})
public void toStringFormatIsRegionColonReason() {
RequestedRegion region = new RequestedRegion("West US 2", RequestedRegionReason.OPERATION_RETRY);

assertThat(region.toString()).isEqualTo("West US 2:OPERATION_RETRY");
}

@Test(groups = {"unit"})
public void constructorRejectsNullRegionName() {
assertThatThrownBy(() -> new RequestedRegion(null, RequestedRegionReason.INITIAL))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("regionName");
}

@Test(groups = {"unit"})
public void constructorRejectsNullReason() {
assertThatThrownBy(() -> new RequestedRegion("East US", null))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("reason");
}

@Test(groups = {"unit"})
public void enumContainsAllSixVariants() {
// Spec gate Q7=A: ship all six variants in the initial release.
RequestedRegionReason[] values = RequestedRegionReason.values();

assertThat(values).containsExactlyInAnyOrder(
RequestedRegionReason.INITIAL,
RequestedRegionReason.OPERATION_RETRY,
RequestedRegionReason.TRANSPORT_RETRY,
RequestedRegionReason.HEDGING,
RequestedRegionReason.REGION_FAILOVER,
RequestedRegionReason.CIRCUIT_BREAKER_PROBE);
}
}
Loading
Loading