From d2d23a84a99d0ea7c8e61a5ce38e4225e0a6d2e1 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:21:20 -0700 Subject: [PATCH 01/13] feat(cosmos): add RequestedRegion and RequestedRegionReason public types Introduces the public surface for the hedging-detection API: an immutable value type representing a single dispatched region tagged with a reason, and a non-exhaustive enum describing why the SDK dispatched. Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure/cosmos/models/RequestedRegion.java | 78 +++++++++++++++++++ .../cosmos/models/RequestedRegionReason.java | 53 +++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegion.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegionReason.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegion.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegion.java new file mode 100644 index 000000000000..31133ac8d53b --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegion.java @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.models; + +import java.util.Locale; +import java.util.Objects; + +/** + * Represents a single Azure Cosmos DB region the SDK dispatched a request to, tagged with the + * reason for dispatch. + *

+ * Instances are immutable and safe to share across threads. Region-name comparison via + * {@link #equals(Object)} and {@link #hashCode()} is case-insensitive to match the SDK's + * region-normalization conventions. + *

+ * Contract: a {@code RequestedRegion} reflects the SDK's decision to dispatch + * a request to a region (post-threshold-delay, post-non-cancellation for hedge arms); it does + * not guarantee a wire-issued request. See + * {@link com.azure.cosmos.CosmosDiagnostics#getRequestedRegions()} for the full "dispatched, + * not necessarily wire-issued" semantic. + */ +public final class RequestedRegion { + + private final String regionName; + private final RequestedRegionReason reason; + + /** + * Creates a new {@link RequestedRegion}. + * + * @param regionName the human-readable region name (for example {@code "East US"}); must not be {@code null}. + * @param reason the {@link RequestedRegionReason} describing why the SDK dispatched to this region; must not be {@code null}. + * @throws NullPointerException if either argument is {@code null}. + */ + public RequestedRegion(String regionName, RequestedRegionReason reason) { + this.regionName = Objects.requireNonNull(regionName, "regionName must not be null"); + this.reason = Objects.requireNonNull(reason, "reason must not be null"); + } + + /** + * Returns the human-readable region name. + * + * @return the region name; never {@code null}. + */ + public String getRegionName() { + return this.regionName; + } + + /** + * Returns the reason the SDK dispatched a request to this region. + * + * @return the {@link RequestedRegionReason}; never {@code null}. + */ + public RequestedRegionReason getReason() { + return this.reason; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RequestedRegion)) { + return false; + } + RequestedRegion that = (RequestedRegion) o; + return this.regionName.equalsIgnoreCase(that.regionName) && this.reason == that.reason; + } + + @Override + public int hashCode() { + return Objects.hash(this.regionName.toLowerCase(Locale.ROOT), this.reason); + } + + @Override + public String toString() { + return this.regionName + ":" + this.reason; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegionReason.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegionReason.java new file mode 100644 index 000000000000..c93b25a8ae95 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/RequestedRegionReason.java @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.models; + +/** + * Describes why the Azure Cosmos DB Java SDK dispatched a request to a specific region. + *

+ * Used in combination with {@link RequestedRegion} on + * {@link com.azure.cosmos.CosmosDiagnostics#getRequestedRegions()} and + * {@link com.azure.cosmos.CosmosDiagnosticsContext#getRequestedRegions()}. + *

+ * This enum is non-exhaustive. Future SDK versions may add additional values, + * so callers MUST include a {@code default:} arm when switching on this enum to remain forward + * compatible. + */ +public enum RequestedRegionReason { + + /** + * The initial attempt for the operation. Every operation has exactly one {@code INITIAL} + * entry tied to the first region the SDK targeted. + */ + INITIAL, + + /** + * A retry triggered by the SDK's operation-level retry policy (for example, retrying the + * same operation after a transient failure observed in another region). + */ + OPERATION_RETRY, + + /** + * A retry triggered by the direct-mode transport layer (for example, a {@code 410 Gone} + * retry handled by {@code GoneAndRetryWithRetryPolicy}). + */ + TRANSPORT_RETRY, + + /** + * A request dispatched to a hedge region as part of a configured cross-region availability + * strategy (such as {@link com.azure.cosmos.ThresholdBasedAvailabilityStrategy}). + */ + HEDGING, + + /** + * A request dispatched as part of region failover (for example, the previously preferred + * region returned a {@code 503} and the SDK moved to the next region). + */ + REGION_FAILOVER, + + /** + * A probe request dispatched by the per-partition circuit breaker after the partition was + * marked unavailable in a region. + */ + CIRCUIT_BREAKER_PROBE +} From 5b216f3a9117bcc8c64c0d60803e9be65880e37f Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:21:30 -0700 Subject: [PATCH 02/13] feat(cosmos): add hedging-detection state with shared regionLock Adds @JsonIgnore-annotated requestedRegions, respondedRegions and hedgingStarted state to ClientSideRequestStatistics, all guarded by a shared regionLock so any reader observes the HEDGING append and the hedgingStarted = true flip atomically. Adds a single appendRequestedRegion bridge surface on ImplementationBridgeHelpers.CosmosDiagnosticsAccessor (no separate setHedgingStarted - compound atomicity is enforced from the bridge). respondedRegions is auto-populated by recordResponse / recordGatewayResponse. Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../ClientSideRequestStatistics.java | 81 +++++++++++++++++++ .../ImplementationBridgeHelpers.java | 22 +++++ 2 files changed, 103 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index aa1f7974848b..87bb7b02e8bd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -10,6 +10,8 @@ import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonGenerator; @@ -67,6 +69,22 @@ private static ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.Cosmos private long approximateInsertionCountInBloomFilter = 0; private Set keywordIdentifiers; + // ===== Hedging Detection API (cross-SDK Hedging Detection API) ===== + // The following three fields are guarded together by `regionLock`. + // They are intentionally excluded from JSON serialization to avoid surprising the + // Spark / encryption / Kafka-connect modules that round-trip this class through Jackson. + @JsonIgnore + private final Object regionLock = new Object(); + + @JsonIgnore + private final List requestedRegions = new ArrayList<>(); + + @JsonIgnore + private final List respondedRegions = new ArrayList<>(); + + @JsonIgnore + private boolean hedgingStarted = false; + public ClientSideRequestStatistics(DiagnosticsClientContext diagnosticsClientContext) { this.diagnosticsClientConfig = diagnosticsClientContext.getConfig(); this.requestStartTimeUTC = Instant.now(); @@ -113,6 +131,13 @@ public ClientSideRequestStatistics(ClientSideRequestStatistics toBeCloned) { this.samplingRateSnapshot = toBeCloned.samplingRateSnapshot; this.approximateInsertionCountInBloomFilter = toBeCloned.approximateInsertionCountInBloomFilter; this.keywordIdentifiers = toBeCloned.keywordIdentifiers; + + // Copy hedging-detection state under the source's regionLock so the snapshot is consistent. + synchronized (toBeCloned.regionLock) { + this.requestedRegions.addAll(toBeCloned.requestedRegions); + this.respondedRegions.addAll(toBeCloned.respondedRegions); + this.hedgingStarted = toBeCloned.hedgingStarted; + } } @JsonIgnore @@ -213,6 +238,8 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnost this.regionsContacted.add(storeResponseStatistics.regionName); this.locationEndpointsContacted.add(locationEndPoint); this.regionsContactedWithContext.add(new RegionWithContext(storeResponseStatistics.regionName, regionalRoutingContext)); + // Hedging Detection API: track in arrival order; duplicates allowed (see Javadoc on getRespondedRegions). + appendRespondedRegionInternal(storeResponseStatistics.regionName); } if (storeResponseStatistics.requestOperationType == OperationType.Head @@ -258,6 +285,8 @@ public void recordGatewayResponse( this.locationEndpointsContacted.add(locationEndpoint); this.regionsContactedWithContext.add(new RegionWithContext(regionName, regionalRoutingContext)); + // Hedging Detection API: track gateway responses in arrival order; duplicates allowed. + appendRespondedRegionInternal(regionName); } GatewayStatistics gatewayStatistics = new GatewayStatistics(); @@ -715,6 +744,58 @@ public RegionalRoutingContext getFirstContactedLocationEndpoint() { return this.regionsContactedWithContext.first().locationEndpointsContacted; } + // ===== Hedging Detection API internal accessors ===== + // + // appendRequestedRegion / appendRespondedRegion and the matching getters are the *only* + // mutators / accessors for the new state. They are reachable from the public surface only + // via the CosmosDiagnosticsAccessor bridge. All operations acquire the shared `regionLock` + // so that any reader sees both the list-append and the `hedgingStarted` flip atomically + // (compound atomicity invariant; see public-spec-java §M5/M6/M8 and internal-spec §SE-017). + + @JsonIgnore + public void appendRequestedRegionInternal(RequestedRegion entry) { + if (entry == null) { + return; + } + synchronized (this.regionLock) { + this.requestedRegions.add(entry); + if (entry.getReason() == RequestedRegionReason.HEDGING) { + this.hedgingStarted = true; + } + } + } + + @JsonIgnore + public void appendRespondedRegionInternal(String regionName) { + if (regionName == null || regionName.isEmpty()) { + return; + } + synchronized (this.regionLock) { + this.respondedRegions.add(regionName); + } + } + + @JsonIgnore + public List getRequestedRegionsSnapshot() { + synchronized (this.regionLock) { + return Collections.unmodifiableList(new ArrayList<>(this.requestedRegions)); + } + } + + @JsonIgnore + public List getRespondedRegionsSnapshot() { + synchronized (this.regionLock) { + return Collections.unmodifiableList(new ArrayList<>(this.respondedRegions)); + } + } + + @JsonIgnore + public boolean isHedgingStartedInternal() { + synchronized (this.regionLock) { + return this.hedgingStarted; + } + } + public static class StoreResponseStatistics { @JsonSerialize(using = StoreResultDiagnostics.StoreResultDiagnosticsSerializer.class) private StoreResultDiagnostics storeResult; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 10b1644fd7d2..9125c7b240bf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -76,6 +76,7 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PriorityLevel; +import com.azure.cosmos.models.RequestedRegion; import com.azure.cosmos.models.ShowQueryMode; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.util.CosmosPagedFlux; @@ -857,6 +858,27 @@ void recordAddressResolutionEnd( void mergeMetadataDiagnosticContext(CosmosDiagnostics cosmosDiagnostics, MetadataDiagnosticsContext otherMetadataDiagnosticsContext); void mergeSerializationDiagnosticContext(CosmosDiagnostics cosmosDiagnostics, SerializationDiagnosticsContext otherSerializationDiagnosticsContext); + + // ===== Hedging Detection API bridge surface ===== + // + // Single append method — implementation acquires the per-stats `regionLock` and + // performs BOTH writes (append to `requestedRegions` AND, if the entry's reason is + // HEDGING, flip `hedgingStarted = true`) inside the synchronized block. Reads on + // isHedgingStarted() / getRequestedRegions() / getRespondedRegions() also take the + // same lock, so any reader observes both writes or neither. See public-spec-java + // §M5 / §M6 / §M8 and internal-spec §SE-017. + // + // Note: there is intentionally NO separate `setHedgingStarted` bridge surface — + // compound atomicity is enforced from the bridge. + void appendRequestedRegion(CosmosDiagnostics cosmosDiagnostics, RequestedRegion entry); + + void appendRespondedRegion(CosmosDiagnostics cosmosDiagnostics, String regionName); + + List getRequestedRegionsInternal(CosmosDiagnostics cosmosDiagnostics); + + List getRespondedRegionsInternal(CosmosDiagnostics cosmosDiagnostics); + + boolean isHedgingStartedInternal(CosmosDiagnostics cosmosDiagnostics); } } From 26f293c4b3f8c300e56f4521ae8ec1ab228ccd53 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:21:38 -0700 Subject: [PATCH 03/13] feat(cosmos): expose isHedgingStarted/getRequestedRegions/getRespondedRegions Adds the three new public methods on CosmosDiagnostics with the mandated `dispatched, not necessarily wire-issued` Javadoc contract, and mirror methods on CosmosDiagnosticsContext that aggregate FIFO across all child diagnostics (matching getContactedRegionNames pattern). All snapshots are unmodifiable. Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../com/azure/cosmos/CosmosDiagnostics.java | 158 ++++++++++++++++++ .../cosmos/CosmosDiagnosticsContext.java | 73 ++++++++ 2 files changed, 231 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java index e2c77504b619..9c5a04aa9eef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java @@ -11,6 +11,7 @@ import com.azure.cosmos.implementation.SerializationDiagnosticsContext; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; +import com.azure.cosmos.models.RequestedRegion; import com.azure.cosmos.util.Beta; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; @@ -195,6 +196,113 @@ public Set getContactedRegionNames() { return this.clientSideRequestStatistics.getContactedRegionNames(); } + /** + * Returns {@code true} if the SDK actually dispatched this operation to a hedge region as + * part of a cross-region availability strategy fan-out. {@code false} for non-hedged + * operations, including the case where hedging was configured but the primary responded + * under the hedge threshold (hedge arms registered but never subscribed; no fan-out + * occurred). + *

+ * {@code false} does NOT mean hedging was disabled or misconfigured. + * To check whether hedging is configured on the client, inspect + * {@code CosmosClientBuilder#endToEndOperationLatencyPolicyConfig} (or the equivalent + * availability-strategy configuration on the client builder). + * + * @return {@code true} if at least one hedge arm was dispatched. + */ + public boolean isHedgingStarted() { + if (this.feedResponseDiagnostics != null) { + if (this.clientSideRequestStatistics != null && this.clientSideRequestStatistics.isHedgingStartedInternal()) { + return true; + } + Collection clientStatisticCollection = + this.feedResponseDiagnostics.getClientSideRequestStatistics(); + if (clientStatisticCollection != null) { + for (ClientSideRequestStatistics stats : clientStatisticCollection) { + if (stats.isHedgingStartedInternal()) { + return true; + } + } + } + return false; + } + return this.clientSideRequestStatistics != null && this.clientSideRequestStatistics.isHedgingStartedInternal(); + } + + /** + * Returns the regions the SDK actually dispatched this operation to, in observed dispatch + * order, each tagged with the reason. Includes the initial attempt. The returned list is + * unmodifiable and is a defensive snapshot. + *

+ * The append site is the actual dispatch point (post-{@code delaySubscription} for hedge + * arms); registered-but-never-subscribed hedge arms do NOT appear here. + *

+ * Contract is "dispatched, not necessarily wire-issued". An entry reflects + * the SDK's decision to commit to dispatching — for hedge arms, this means the per-arm + * threshold delay elapsed without cancellation, so the inner-{@code Mono} subscription + * fired. A cancellation in the small window between that dispatch decision and the + * underlying Netty channel write still leaves the entry in this list. Callers should + * treat the list as a record of intent-to-dispatch, not a record of wire-issued requests. + *

+ * {@link com.azure.cosmos.models.RequestedRegionReason} is non-exhaustive — + * callers MUST include a {@code default:} arm when switching on it. + * + * @return an unmodifiable list of dispatched regions in observed dispatch order. Never {@code null}. + */ + public List getRequestedRegions() { + if (this.feedResponseDiagnostics != null) { + List aggregated = new ArrayList<>(); + if (this.clientSideRequestStatistics != null) { + aggregated.addAll(this.clientSideRequestStatistics.getRequestedRegionsSnapshot()); + } + Collection clientStatisticCollection = + this.feedResponseDiagnostics.getClientSideRequestStatistics(); + if (clientStatisticCollection != null) { + for (ClientSideRequestStatistics stats : clientStatisticCollection) { + aggregated.addAll(stats.getRequestedRegionsSnapshot()); + } + } + return Collections.unmodifiableList(aggregated); + } + if (this.clientSideRequestStatistics == null) { + return Collections.emptyList(); + } + return this.clientSideRequestStatistics.getRequestedRegionsSnapshot(); + } + + /** + * Returns the regions that returned a response (success or failure) for this operation, in + * arrival order. The returned list is unmodifiable and is a defensive snapshot. + *

+ * Duplicates are allowed and expected. The same region may appear more + * than once if it produced multiple responses (for example, a late response after a hedge + * winner, or several retries on a single region). {@code count > 1} does NOT imply that + * more than one distinct region responded. For unique regions, call + * {@code .stream().distinct().collect(java.util.stream.Collectors.toList())}. + * + * @return an unmodifiable list of regions that responded, in arrival order. Never {@code null}. + */ + public List getRespondedRegions() { + if (this.feedResponseDiagnostics != null) { + List aggregated = new ArrayList<>(); + if (this.clientSideRequestStatistics != null) { + aggregated.addAll(this.clientSideRequestStatistics.getRespondedRegionsSnapshot()); + } + Collection clientStatisticCollection = + this.feedResponseDiagnostics.getClientSideRequestStatistics(); + if (clientStatisticCollection != null) { + for (ClientSideRequestStatistics stats : clientStatisticCollection) { + aggregated.addAll(stats.getRespondedRegionsSnapshot()); + } + } + return Collections.unmodifiableList(aggregated); + } + if (this.clientSideRequestStatistics == null) { + return Collections.emptyList(); + } + return this.clientSideRequestStatistics.getRespondedRegionsSnapshot(); + } + /** * Gets the UserAgent header value used by the client issueing this operation * @return the UserAgent header value used for the client that issued this operation @@ -515,6 +623,56 @@ public void mergeSerializationDiagnosticContext(CosmosDiagnostics cosmosDiagnost clientSideRequestStatistics.mergeSerializationDiagnosticsContext(otherSerializationDiagnosticsContext); } } + + @Override + public void appendRequestedRegion(CosmosDiagnostics cosmosDiagnostics, RequestedRegion entry) { + if (cosmosDiagnostics == null || entry == null) { + return; + } + ClientSideRequestStatistics stats = cosmosDiagnostics.clientSideRequestStatistics; + if (stats == null) { + return; + } + // The compound write (list-append + maybe-flip-hedgingStarted) happens atomically + // under stats.regionLock inside `appendRequestedRegionInternal`. See M5/M6/M8. + stats.appendRequestedRegionInternal(entry); + } + + @Override + public void appendRespondedRegion(CosmosDiagnostics cosmosDiagnostics, String regionName) { + if (cosmosDiagnostics == null || regionName == null) { + return; + } + ClientSideRequestStatistics stats = cosmosDiagnostics.clientSideRequestStatistics; + if (stats == null) { + return; + } + stats.appendRespondedRegionInternal(regionName); + } + + @Override + public List getRequestedRegionsInternal(CosmosDiagnostics cosmosDiagnostics) { + if (cosmosDiagnostics == null) { + return Collections.emptyList(); + } + return cosmosDiagnostics.getRequestedRegions(); + } + + @Override + public List getRespondedRegionsInternal(CosmosDiagnostics cosmosDiagnostics) { + if (cosmosDiagnostics == null) { + return Collections.emptyList(); + } + return cosmosDiagnostics.getRespondedRegions(); + } + + @Override + public boolean isHedgingStartedInternal(CosmosDiagnostics cosmosDiagnostics) { + if (cosmosDiagnostics == null) { + return false; + } + return cosmosDiagnostics.isHedgingStarted(); + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java index dc626b41f08e..c094433f5fad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; +import com.azure.cosmos.models.RequestedRegion; import com.azure.cosmos.util.Beta; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -26,6 +27,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -438,6 +440,77 @@ public Set getContactedRegionNames() { return regionsContacted; } + /** + * Returns {@code true} if any per-operation {@link CosmosDiagnostics} aggregated under this + * context reports {@link CosmosDiagnostics#isHedgingStarted()} as {@code true}. + *

+ * Returns {@code false} when no hedge arm was dispatched — including the case where hedging + * was configured but the primary responded under the hedge threshold (hedge arms registered + * but never subscribed). + * + * @return {@code true} if hedging was actually dispatched for any aggregated operation. + */ + public boolean isHedgingStarted() { + if (this.diagnostics == null) { + return false; + } + for (CosmosDiagnostics d : this.diagnostics) { + if (d.isHedgingStarted()) { + return true; + } + } + return false; + } + + /** + * Returns the regions the SDK dispatched this operation to, aggregated across all + * per-operation {@link CosmosDiagnostics} children in observed order + * (FIFO traversal of the underlying {@code ConcurrentLinkedDeque}, matching the order used + * by {@link #getContactedRegionNames()}). The returned list is unmodifiable. + *

+ * Each entry reflects an actual SDK dispatch decision (post-threshold-delay, + * post-non-cancellation for hedge arms). See + * {@link CosmosDiagnostics#getRequestedRegions()} for the full "dispatched, not necessarily + * wire-issued" semantic and a note about the non-exhaustive + * {@link com.azure.cosmos.models.RequestedRegionReason} enum. + * + * @return an unmodifiable, FIFO-ordered list of dispatched regions across all child + * diagnostics. Never {@code null}. + */ + public List getRequestedRegions() { + if (this.diagnostics == null) { + return Collections.emptyList(); + } + List aggregated = new ArrayList<>(); + for (CosmosDiagnostics d : this.diagnostics) { + aggregated.addAll(d.getRequestedRegions()); + } + return Collections.unmodifiableList(aggregated); + } + + /** + * Returns the regions that returned a response (success or failure) aggregated across all + * per-operation {@link CosmosDiagnostics} children in observed order. The returned list is + * unmodifiable. + *

+ * Duplicates are allowed and expected. See + * {@link CosmosDiagnostics#getRespondedRegions()} for the full contract and the + * dedup recipe. + * + * @return an unmodifiable list of regions that responded, across all child diagnostics. + * Never {@code null}. + */ + public List getRespondedRegions() { + if (this.diagnostics == null) { + return Collections.emptyList(); + } + List aggregated = new ArrayList<>(); + for (CosmosDiagnostics d : this.diagnostics) { + aggregated.addAll(d.getRespondedRegions()); + } + return Collections.unmodifiableList(aggregated); + } + /** * Returns the system usage * NOTE: this information is not included in the json representation returned from {@link #toJson()} because it From 6217e064cc6ad34fb44867e64aafaddb54e4810c Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:21:51 -0700 Subject: [PATCH 04/13] feat(cosmos): wire orchestrator and retry sites for hedging detection Wires INITIAL/HEDGING dispatch sites into the point-operation and feed- operation availability-strategy orchestrators in RxDocumentClientImpl. The HEDGING doOnSubscribe is layered on the upstream of delaySubscription so registered-but-never-subscribed hedge arms do NOT produce phantom entries. ScopedDiagnosticsFactory carries the orchestrator-scope view so each per-arm CosmosDiagnostics also reflects all dispatch decisions. ClientRetryPolicy records OPERATION_RETRY on session-not-available retry and REGION_FAILOVER on refreshLocation / gateway-timeout failover. GoneAndRetryWithRetryPolicy records TRANSPORT_RETRY at the actual 410-Gone retryAfter dispatch point. CIRCUIT_BREAKER_PROBE ships in the enum but stays unpopulated pending PPCB/PPAF upstream PRs. Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/ClientRetryPolicy.java | 41 +++++++ .../implementation/RxDocumentClientImpl.java | 112 +++++++++++++++++- .../GoneAndRetryWithRetryPolicy.java | 45 +++++++ 3 files changed, 194 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index 4a5b2f409df7..5692fc7daba3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -12,6 +12,8 @@ import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -251,6 +253,8 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ return ShouldRetryResult.noRetry(); } else { this.retryContext = new RetryContext(this.sessionTokenRetryCount , true); + // Hedging Detection API: same-region in-session retry — OPERATION_RETRY. + recordRequestedRegion(RequestedRegionReason.OPERATION_RETRY); return ShouldRetryResult.retryAfter(Duration.ZERO); } } else { @@ -344,6 +348,8 @@ private Mono shouldRetryOnGatewayTimeout(CosmosException clie this.failoverRetryCount++; this.retryContext = new RetryContext(this.failoverRetryCount, true); Duration retryDelay = Duration.ofMillis(this.endpointFailoverRetryIntervalInMs); + // Hedging Detection API: gateway-timeout cross-region retry is a REGION_FAILOVER. + recordRequestedRegion(RequestedRegionReason.REGION_FAILOVER); return Mono.just(ShouldRetryResult.retryAfter(retryDelay)); } @@ -386,9 +392,44 @@ private Mono refreshLocation(boolean isReadRequest, boolean forceRefresh, } this.retryContext = new RetryContext(this.failoverRetryCount, usePreferredLocations); + + // Hedging Detection API: record REGION_FAILOVER at the moment this policy commits to + // retrying on a different region (the previous endpoint has been marked unavailable; + // a refresh will be issued to pick a new region). See public-spec-java §M7. + recordRequestedRegion(RequestedRegionReason.REGION_FAILOVER); + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); } + // Hedging Detection API helper — records a RequestedRegion entry on the diagnostics + // associated with the current request. Region name is resolved via the GlobalEndpointManager + // using the currently-pinned `regionalRoutingContext`. Best-effort: silently no-ops when + // the request, diagnostics, or routing context is unavailable. + private void recordRequestedRegion(RequestedRegionReason reason) { + if (this.request == null || this.request.requestContext == null + || this.request.requestContext.cosmosDiagnostics == null + || this.regionalRoutingContext == null) { + return; + } + try { + String regionName = this.globalEndpointManager.getRegionName( + this.regionalRoutingContext.getGatewayRegionalEndpoint(), + this.request.getOperationType(), + this.request.isPerPartitionAutomaticFailoverEnabledAndWriteRequest); + if (regionName == null || regionName.isEmpty()) { + return; + } + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + diagAcc.appendRequestedRegion( + this.request.requestContext.cosmosDiagnostics, + new RequestedRegion(regionName, reason)); + } catch (RuntimeException ex) { + // Defensive: diagnostics-append must never break the retry flow. + logger.debug("Failed to record RequestedRegion for retry decision", ex); + } + } + private Mono shouldRetryOnBackendServiceUnavailableAsync( boolean isReadRequest, boolean isWebExceptionRetriable, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 11121bca033e..a7c7098460f9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -93,6 +93,8 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PartitionKind; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.fasterxml.jackson.databind.ObjectMapper; @@ -7442,6 +7444,10 @@ private Mono> wrapPointOperationWithAvailabilityStrat RequestOptions clonedOptions = new RequestOptions(nonNullRequestOptions); if (monoList.isEmpty()) { + // Hedging Detection API: record INITIAL at orchestrator entry, BEFORE the + // primary regional Mono is built. The primary is unconditionally dispatched. + diagnosticsFactory.recordRequestedRegion( + new RequestedRegion(region, RequestedRegionReason.INITIAL)); // no special error handling for transient errors to suppress them here // because any cross-regional retries are expected to be processed // by the ClientRetryPolicy for the initial request - so, any outcome of the @@ -7529,14 +7535,22 @@ private Mono> wrapPointOperationWithAvailabilityStrat .getThresholdStep() .multipliedBy(monoList.size() - 1)); + // Hedging Detection API: append `.doOnSubscribe(...)` BEFORE `.delaySubscription(...)`. + // Operator order is significant — see the comment above the feed-operation + // counterpart and public-spec-java §M7 / AC2 / AC10. + Mono hedgeArmWithDispatchTracking = + regionalCrossRegionRetryMono + .doOnSubscribe(s -> diagnosticsFactory.recordRequestedRegion( + new RequestedRegion(region, RequestedRegionReason.HEDGING))); + if (logger.isDebugEnabled()) { monoList.add( - regionalCrossRegionRetryMono + hedgeArmWithDispatchTracking .doOnSubscribe(c -> logger.debug("STARTING to process {} operation in region '{}'", operationType, region)) .delaySubscription(delayForCrossRegionalRetry)); } else { monoList.add( - regionalCrossRegionRetryMono + hedgeArmWithDispatchTracking .delaySubscription(delayForCrossRegionalRetry)); } } @@ -7900,6 +7914,14 @@ private Mono executeFeedOperationWithAvailabilityStrategy( (ThresholdBasedAvailabilityStrategy)endToEndPolicyConfig.getAvailabilityStrategy(); List>> monoList = new ArrayList<>(); + // Hedging Detection API: feed-op orchestrator does not use ScopedDiagnosticsFactory; per-arm + // diagnostics live on each cloned request's request context. We append the requested-region + // entry directly onto that diagnostics via the bridge. The aggregation across arms is + // performed by CosmosDiagnosticsContext (which the customer accesses via the operation + // context, matching getContactedRegionNames aggregation at line 428). + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor feedDiagAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + orderedApplicableRegionsForSpeculation .forEach(region -> { RxDocumentServiceRequest clonedRequest = req.clone(); @@ -7930,6 +7952,15 @@ private Mono executeFeedOperationWithAvailabilityStrategy( clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest); + // Hedging Detection API: record INITIAL for the primary feed-op arm at + // orchestrator entry, before the regional Mono is built. Primary is + // unconditionally dispatched. + if (clonedRequest.requestContext.cosmosDiagnostics != null) { + feedDiagAccessor.appendRequestedRegion( + clonedRequest.requestContext.cosmosDiagnostics, + new RequestedRegion(region, RequestedRegionReason.INITIAL)); + } + Mono> initialMonoAcrossAllRegions = handleCircuitBreakingFeedbackForFeedOperationWithAvailabilityStrategy(feedOperation.apply(retryPolicyFactory, clonedRequest) .map(NonTransientFeedOperationResult::new) @@ -7996,14 +8027,30 @@ private Mono executeFeedOperationWithAvailabilityStrategy( .getThresholdStep() .multipliedBy(monoList.size() - 1)); + // Hedging Detection API: feed-op hedge-arm dispatch tracking. The + // `.doOnSubscribe(...)` MUST be BEFORE `.delaySubscription(...)` so the + // handler fires only after the threshold delay elapses without cancellation. + // Primary-wins-under-threshold leaves no phantom HEDGING entry. See + // public-spec-java §M7 / AC2 / AC10; internal-spec §SE-013 / SE-021. + final RxDocumentServiceRequest hedgeArmRequest = clonedRequest; + Mono> hedgeArmWithDispatchTracking = + regionalCrossRegionRetryMono + .doOnSubscribe(s -> { + if (hedgeArmRequest.requestContext.cosmosDiagnostics != null) { + feedDiagAccessor.appendRequestedRegion( + hedgeArmRequest.requestContext.cosmosDiagnostics, + new RequestedRegion(region, RequestedRegionReason.HEDGING)); + } + }); + if (logger.isDebugEnabled()) { monoList.add( - regionalCrossRegionRetryMono + hedgeArmWithDispatchTracking .doOnSubscribe(c -> logger.debug("STARTING to process {} operation in region '{}'", operationType, region)) .delaySubscription(delayForCrossRegionalRetry)); } else { monoList.add( - regionalCrossRegionRetryMono + hedgeArmWithDispatchTracking .delaySubscription(delayForCrossRegionalRetry)); } } @@ -8304,6 +8351,19 @@ private static class ScopedDiagnosticsFactory implements DiagnosticsClientContex private final AtomicReference> gatewayCancelledDiagnosticsHandler = new AtomicReference<>(null); + // ===== Hedging Detection API — orchestrator-scoped requested-regions state ===== + // + // For multi-region availability-strategy operations, the orchestrator owns the + // requested-regions intent log. We propagate every recorded entry to: + // (a) every already-created per-arm CosmosDiagnostics (so callers reading any arm's + // diagnostics see the full orchestrator view), and + // (b) every diagnostics created after the entry was recorded (back-fill at create + // time). + // Guarded by `factoryRegionLock`. The per-stats ClientSideRequestStatistics.regionLock + // independently guards the per-arm storage (see M5/M6/M8). + private final Object factoryRegionLock = new Object(); + private final List orchestratorRequestedRegions = new ArrayList<>(); + public ScopedDiagnosticsFactory(DiagnosticsClientContext inner, boolean shouldCaptureAllFeedDiagnostics) { checkNotNull(inner, "Argument 'inner' must not be null."); this.inner = inner; @@ -8321,9 +8381,53 @@ public CosmosDiagnostics createDiagnostics() { CosmosDiagnostics diagnostics = inner.createDiagnostics(); createdDiagnostics.add(diagnostics); mostRecentlyCreatedDiagnostics.set(diagnostics); + // Back-fill any requested-regions entries already recorded at the orchestrator scope + // so that this newly-created per-arm diagnostics sees the full view (including the + // INITIAL entry recorded before this arm's diagnostics existed). + List snapshot; + synchronized (factoryRegionLock) { + if (orchestratorRequestedRegions.isEmpty()) { + return diagnostics; + } + snapshot = new ArrayList<>(orchestratorRequestedRegions); + } + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + for (RequestedRegion entry : snapshot) { + diagAcc.appendRequestedRegion(diagnostics, entry); + } return diagnostics; } + /** + * Records an orchestrator-level requested-region entry. The entry is: + * (a) stored in the factory-scope list, so any per-arm CosmosDiagnostics created + * LATER will see the entry; and + * (b) appended (via the bridge) to every already-created per-arm CosmosDiagnostics, + * so any reader of any arm's diagnostics sees the full orchestrator view. + * + * Reactor operator-order discipline: callers MUST place {@code .doOnSubscribe(s -> ...)} + * BEFORE {@code .delaySubscription(threshold)} in the fluent chain. That way, the + * doOnSubscribe handler fires only after the delay elapses without cancellation — + * primary-wins-under-threshold leaves no phantom HEDGING entries (design doc §12 + * "no phantom entries"; AC2/AC10; SE-021). + */ + public void recordRequestedRegion(RequestedRegion entry) { + if (entry == null) { + return; + } + List snapshot; + synchronized (factoryRegionLock) { + orchestratorRequestedRegions.add(entry); + snapshot = new ArrayList<>(createdDiagnostics); + } + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + for (CosmosDiagnostics d : snapshot) { + diagAcc.appendRequestedRegion(d, entry); + } + } + @Override public String getUserAgent() { return inner.getUserAgent(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 5bfe3d8078dd..7b0d6907c699 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; @@ -21,6 +22,8 @@ import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -93,6 +96,40 @@ private Duration getElapsedTime() { return Duration.between(this.start, endSnapshot); } + // Hedging Detection API: best-effort helper that appends a TRANSPORT_RETRY entry to the + // request's diagnostics. The region is the most-recently-contacted region on the request's + // ClientSideRequestStatistics (matching the retry-dispatch semantic). Silently no-ops when + // the request, diagnostics, or region cannot be resolved. NOTE: the PPCB / PPAF probe sites + // also need to surface a TRANSPORT_RETRY / CIRCUIT_BREAKER_PROBE hook — those depend on + // upstream PRs #45197 / #45267 / #46477 / #48421 (SE-005 / SE-012). When those land in the + // Java SDK, wire CIRCUIT_BREAKER_PROBE at the probe-issue site there. + private static void recordTransportRetryRequestedRegion(RxDocumentServiceRequest request) { + if (request == null || request.requestContext == null + || request.requestContext.cosmosDiagnostics == null) { + return; + } + try { + String regionName = null; + CosmosDiagnostics diag = request.requestContext.cosmosDiagnostics; + // Region resolution: fall back to "FirstContactedRegion" if currently-pinned region + // is unknown at this layer. + if (!diag.getContactedRegionNames().isEmpty()) { + regionName = diag.getContactedRegionNames().iterator().next(); + } + if (regionName == null || regionName.isEmpty()) { + return; + } + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + diagAcc.appendRequestedRegion( + diag, + new RequestedRegion(regionName, RequestedRegionReason.TRANSPORT_RETRY)); + } catch (RuntimeException ex) { + // Defensive: diagnostics-append must never break the retry flow. + logger.debug("Failed to record TRANSPORT_RETRY RequestedRegion", ex); + } + } + class GoneRetryPolicy implements IRetryPolicy { private final static int DEFAULT_WAIT_TIME_IN_SECONDS = 30; private final static int MAXIMUM_BACKOFF_TIME_IN_SECONDS = 15; @@ -269,6 +306,14 @@ public Mono shouldRetry(Exception exception) { forceRefreshAddressCache = exceptionHandlingResult.getRight(); + // Hedging Detection API: TRANSPORT_RETRY is recorded at the actual retry-dispatch + // site. Although the spec's enum value also covers PPCB / Direct-mode probe paths + // (which require upstream PRs #45197 / #45267 / #46477 / #48421 to surface a clean + // hook), the 410 Gone retry path is observable here and is the canonical + // transport-layer retry. The region is the currently-pinned region on the request's + // diagnostics; we record on the request's cosmosDiagnostics. + recordTransportRetryRequestedRegion(this.request); + return Mono.just(ShouldRetryResult.retryAfter(backoffTime, Quadruple.with(forceRefreshAddressCache, true, timeout, currentRetryAttemptCount))); } From 1b11790a21bf92f90b977ddf9c333c8b667eaac8 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:22:06 -0700 Subject: [PATCH 05/13] test(cosmos): add hedging-detection AC matrix tests Adds RequestedRegionTest (value-type semantics: equality, hash, toString, null-guards, enum-completeness) and HedgingDetectionUnitTests (compound-atomicity invariant, FIFO ordering, duplicate respondedRegions, unmodifiable snapshots, concurrent-writer stress). Also adds a live-account smoke test (group 'multi-master') on EndToEndTimeOutWithAvailabilityTest that asserts isHedgingStarted / getRequestedRegions / getRespondedRegions produce the expected end-to-end signal when the primary is slow and a hedge arm wins. Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../EndToEndTimeOutWithAvailabilityTest.java | 72 +++++++ .../HedgingDetectionUnitTests.java | 195 ++++++++++++++++++ .../cosmos/models/RequestedRegionTest.java | 86 ++++++++ 3 files changed, 353 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java create mode 100644 sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/models/RequestedRegionTest.java diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java index 5d82877afe68..ed754b1d0cce 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java @@ -18,6 +18,8 @@ import com.azure.cosmos.models.CosmosReadManyRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; import com.azure.cosmos.rx.TestSuiteBase; import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; @@ -141,6 +143,76 @@ public void testThresholdAvailabilityStrategy( } } + /** + * Live multi-region smoke test for the hedging-detection API (issue + * Azure/azure-sdk-for-java#49182, AC14). Runs only against the team's multi-region test + * account. Asserts the public {@code isHedgingStarted} / {@code getRequestedRegions} / + * {@code getRespondedRegions} surface produces the expected end-to-end signal when the + * primary region is slow and a hedge to the secondary wins. End-to-end fidelity check; + * not a substitute for the AC-matrix coverage under + * {@code com.azure.cosmos.diagnostics.HedgingDetectionUnitTests}. + */ + @Test(groups = {"multi-master"}, timeOut = TIMEOUT * 10) + public void hedgingDetectionApi_primarySlow_returnsCrossRegionDiagnostics_liveAccount() { + if (this.preferredRegionList.size() <= 1) { + throw new SkipException("hedgingDetectionApi smoke test requires >= 2 preferred regions"); + } + + ConnectionMode connectionMode = getClientBuilder().buildConnectionPolicy().getConnectionMode(); + FaultInjectionConnectionType faultInjectionConnectionType = connectionMode == ConnectionMode.DIRECT ? + FaultInjectionConnectionType.DIRECT : + FaultInjectionConnectionType.GATEWAY; + + TestObject createdItem = TestObject.create(); + this.cosmosAsyncContainer.createItem(createdItem).block(); + + FaultInjectionRule rule = injectFailure(this.cosmosAsyncContainer, faultInjectionConnectionType); + try { + // Let cross-region replication catch up so the hedge arm in region[1] can succeed. + Thread.sleep(2000); + + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + CosmosDiagnostics cosmosDiagnostics = performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, createdItem, options, false, null); + + assertThat(cosmosDiagnostics).isNotNull(); + + // AC14(b): hedging actually fired. + assertThat(cosmosDiagnostics.isHedgingStarted()) + .as("isHedgingStarted() should be true when the primary is slow and a hedge wins") + .isTrue(); + + // AC14(a): both regions appear, secondary tagged HEDGING. + List regions = cosmosDiagnostics.getRequestedRegions(); + assertThat(regions) + .as("getRequestedRegions() should contain INITIAL on primary and HEDGING on secondary") + .anyMatch(r -> r.getReason() == RequestedRegionReason.INITIAL + && r.getRegionName().equalsIgnoreCase(this.regions.get(0))) + .anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING + && r.getRegionName().equalsIgnoreCase(this.regions.get(1))); + + // AC14(c): secondary actually responded. + assertThat(cosmosDiagnostics.getRespondedRegions()) + .as("getRespondedRegions() should include the hedge winner") + .anyMatch(name -> name.equalsIgnoreCase(this.regions.get(1))); + + CosmosDiagnosticsContext ctx = cosmosDiagnostics.getDiagnosticsContext(); + if (ctx != null) { + // Context-level mirror should agree with the per-operation diagnostics. + assertThat(ctx.isHedgingStarted()).isTrue(); + assertThat(ctx.getRequestedRegions()) + .anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING); + assertThat(ctx.getRespondedRegions()) + .anyMatch(name -> name.equalsIgnoreCase(this.regions.get(1))); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + fail("hedging-detection smoke test interrupted", ex); + } finally { + rule.disable(); + } + } + @Test(groups = {"multi-region"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT*100) public void testThresholdAvailabilityStrategyForReadsDefaultEnablementWithPpaf( OperationType operationType, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java new file mode 100644 index 000000000000..8adf8566f3ef --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java @@ -0,0 +1,195 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.diagnostics; + +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.TestUtils; +import com.azure.cosmos.models.RequestedRegion; +import com.azure.cosmos.models.RequestedRegionReason; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit-level tests for the hedging-detection state on {@link CosmosDiagnostics}. + * + *

Covers AC1, AC2, AC4, AC5, AC8 (lock-guarded compound atomicity), AC10 (unmodifiable + * snapshot) and the spec invariants in {@code public-spec-java.md} §M5 / §M6 / §M8 (compound + * atomicity of HEDGING dispatch + {@code hedgingStarted = true} flip). + * + *

Behavioural ACs that require the orchestrator wiring (AC3, AC6, AC7, AC9, AC11, AC14) are + * exercised by end-to-end / fault-injection tests against the live multi-region service; this + * class only covers the parts that are observable without a network/orchestrator. + */ +public class HedgingDetectionUnitTests { + + private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor DIAG_ACCESSOR = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + + private static CosmosDiagnostics newDiagnostics() { + DiagnosticsClientContext ctx = TestUtils.mockDiagnosticsClientContext(); + return ctx.createDiagnostics(); + } + + @Test(groups = {"unit"}) + public void newDiagnosticsHasEmptyHedgingState() { + CosmosDiagnostics diagnostics = newDiagnostics(); + + assertThat(diagnostics.isHedgingStarted()).isFalse(); + assertThat(diagnostics.getRequestedRegions()).isEmpty(); + assertThat(diagnostics.getRespondedRegions()).isEmpty(); + } + + @Test(groups = {"unit"}) + public void appendingHedgingFlipsHedgingStartedAtomically() { + CosmosDiagnostics diagnostics = newDiagnostics(); + + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL)); + assertThat(diagnostics.isHedgingStarted()).isFalse(); + + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING)); + + // Both writes occur under regionLock in a single appendRequestedRegionInternal call, + // so any reader (also under the lock) sees the list update AND the flag flip together. + assertThat(diagnostics.isHedgingStarted()).isTrue(); + assertThat(diagnostics.getRequestedRegions()) + .extracting(RequestedRegion::getReason) + .contains(RequestedRegionReason.HEDGING); + } + + @Test(groups = {"unit"}) + public void nonHedgingReasonsDoNotFlipHedgingStarted() { + CosmosDiagnostics diagnostics = newDiagnostics(); + + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.OPERATION_RETRY)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.REGION_FAILOVER)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.TRANSPORT_RETRY)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.CIRCUIT_BREAKER_PROBE)); + + assertThat(diagnostics.isHedgingStarted()).isFalse(); + assertThat(diagnostics.getRequestedRegions()).hasSize(5); + } + + @Test(groups = {"unit"}) + public void requestedRegionsPreserveFifoOrder() { + CosmosDiagnostics diagnostics = newDiagnostics(); + + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("North Europe", RequestedRegionReason.HEDGING)); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("North Europe", RequestedRegionReason.OPERATION_RETRY)); + + List regions = diagnostics.getRequestedRegions(); + assertThat(regions).containsExactly( + new RequestedRegion("East US", RequestedRegionReason.INITIAL), + new RequestedRegion("West US", RequestedRegionReason.HEDGING), + new RequestedRegion("North Europe", RequestedRegionReason.HEDGING), + new RequestedRegion("North Europe", RequestedRegionReason.OPERATION_RETRY)); + } + + @Test(groups = {"unit"}) + public void respondedRegionsAllowDuplicatesInArrivalOrder() { + // Gate Q9=A: duplicates allowed in getRespondedRegions. A region that produces multiple + // responses (late hedge response, retry) must appear once per response. + CosmosDiagnostics diagnostics = newDiagnostics(); + + DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US"); + DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "West US"); + DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US"); + + assertThat(diagnostics.getRespondedRegions()).containsExactly("East US", "West US", "East US"); + } + + @Test(groups = {"unit"}) + public void getRequestedRegionsReturnsUnmodifiableSnapshot() { + CosmosDiagnostics diagnostics = newDiagnostics(); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL)); + + List snapshot = diagnostics.getRequestedRegions(); + + assertThatThrownBy(() -> snapshot.add(new RequestedRegion("West US", RequestedRegionReason.HEDGING))) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test(groups = {"unit"}) + public void getRespondedRegionsReturnsUnmodifiableSnapshot() { + CosmosDiagnostics diagnostics = newDiagnostics(); + DIAG_ACCESSOR.appendRespondedRegion(diagnostics, "East US"); + + List snapshot = diagnostics.getRespondedRegions(); + + assertThatThrownBy(() -> snapshot.add("West US")) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test(groups = {"unit"}) + public void snapshotIsDecoupledFromLaterMutations() { + // The list returned at time T must NOT reflect appends after T (defensive snapshot). + CosmosDiagnostics diagnostics = newDiagnostics(); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("East US", RequestedRegionReason.INITIAL)); + + List snapshotBefore = diagnostics.getRequestedRegions(); + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, new RequestedRegion("West US", RequestedRegionReason.HEDGING)); + List snapshotAfter = diagnostics.getRequestedRegions(); + + assertThat(snapshotBefore).hasSize(1); + assertThat(snapshotAfter).hasSize(2); + } + + @Test(groups = {"unit"}, timeOut = 30_000L) + public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws InterruptedException { + // 16 writer threads, each appending 1000 HEDGING entries to the same diagnostics. + // Invariant: every reader that observes isHedgingStarted() == true must also see at + // least one HEDGING entry in getRequestedRegions(), and vice-versa. + CosmosDiagnostics diagnostics = newDiagnostics(); + + final int writers = 16; + final int perWriter = 1000; + final CountDownLatch latch = new CountDownLatch(writers); + final AtomicInteger violations = new AtomicInteger(); + + Thread reader = new Thread(() -> { + while (latch.getCount() > 0) { + boolean flag = diagnostics.isHedgingStarted(); + List regions = diagnostics.getRequestedRegions(); + boolean hasHedge = regions.stream().anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING); + if (flag != hasHedge) { + violations.incrementAndGet(); + } + } + }, "hedging-detection-reader"); + reader.setDaemon(true); + reader.start(); + + for (int w = 0; w < writers; w++) { + final int writerId = w; + new Thread(() -> { + try { + for (int i = 0; i < perWriter; i++) { + DIAG_ACCESSOR.appendRequestedRegion(diagnostics, + new RequestedRegion("Region-" + writerId, RequestedRegionReason.HEDGING)); + } + } finally { + latch.countDown(); + } + }, "hedging-detection-writer-" + w).start(); + } + + latch.await(); + reader.join(5_000L); + + assertThat(violations.get()) + .as("isHedgingStarted() and getRequestedRegions() must agree under concurrent writers (M5/M6/M8)") + .isZero(); + assertThat(diagnostics.isHedgingStarted()).isTrue(); + assertThat(diagnostics.getRequestedRegions()).hasSize(writers * perWriter); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/models/RequestedRegionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/models/RequestedRegionTest.java new file mode 100644 index 000000000000..d351809c9c1e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/models/RequestedRegionTest.java @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.models; + +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link RequestedRegion} value-type semantics (AC1 / spec section 3.1). + */ +public class RequestedRegionTest { + + @Test(groups = {"unit"}) + public void gettersReturnConstructorArguments() { + RequestedRegion region = new RequestedRegion("East US", RequestedRegionReason.INITIAL); + + assertThat(region.getRegionName()).isEqualTo("East US"); + assertThat(region.getReason()).isEqualTo(RequestedRegionReason.INITIAL); + } + + @Test(groups = {"unit"}) + public void equalsAndHashCodeAreCaseInsensitiveOnRegionName() { + RequestedRegion a = new RequestedRegion("East US", RequestedRegionReason.HEDGING); + RequestedRegion b = new RequestedRegion("east us", RequestedRegionReason.HEDGING); + RequestedRegion c = new RequestedRegion("EAST US", RequestedRegionReason.HEDGING); + + assertThat(a).isEqualTo(b); + assertThat(a).isEqualTo(c); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + assertThat(a.hashCode()).isEqualTo(c.hashCode()); + } + + @Test(groups = {"unit"}) + public void equalsDistinguishesByReason() { + RequestedRegion initial = new RequestedRegion("East US", RequestedRegionReason.INITIAL); + RequestedRegion hedge = new RequestedRegion("East US", RequestedRegionReason.HEDGING); + + assertThat(initial).isNotEqualTo(hedge); + } + + @Test(groups = {"unit"}) + public void equalsHandlesNullAndOtherTypes() { + RequestedRegion region = new RequestedRegion("East US", RequestedRegionReason.INITIAL); + + assertThat(region.equals(null)).isFalse(); + assertThat(region.equals("East US")).isFalse(); + assertThat(region).isEqualTo(region); + } + + @Test(groups = {"unit"}) + public void toStringFormatIsRegionColonReason() { + RequestedRegion region = new RequestedRegion("West US 2", RequestedRegionReason.OPERATION_RETRY); + + assertThat(region.toString()).isEqualTo("West US 2:OPERATION_RETRY"); + } + + @Test(groups = {"unit"}) + public void constructorRejectsNullRegionName() { + assertThatThrownBy(() -> new RequestedRegion(null, RequestedRegionReason.INITIAL)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("regionName"); + } + + @Test(groups = {"unit"}) + public void constructorRejectsNullReason() { + assertThatThrownBy(() -> new RequestedRegion("East US", null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("reason"); + } + + @Test(groups = {"unit"}) + public void enumContainsAllSixVariants() { + // Spec gate Q7=A: ship all six variants in the initial release. + RequestedRegionReason[] values = RequestedRegionReason.values(); + + assertThat(values).containsExactlyInAnyOrder( + RequestedRegionReason.INITIAL, + RequestedRegionReason.OPERATION_RETRY, + RequestedRegionReason.TRANSPORT_RETRY, + RequestedRegionReason.HEDGING, + RequestedRegionReason.REGION_FAILOVER, + RequestedRegionReason.CIRCUIT_BREAKER_PROBE); + } +} From de298c3ca5ef1ef6b3c411ae3f7c84a0197414a6 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 14:22:13 -0700 Subject: [PATCH 06/13] docs(cosmos): CHANGELOG entry for hedging detection API Refs Azure/azure-sdk-for-java#49182. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index e8ea564fab7b..2906d1281aff 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.80.0-beta.1 (Unreleased) #### Features Added +* Added Hedging Detection API on `CosmosDiagnostics` and `CosmosDiagnosticsContext`: new `isHedgingStarted()`, `getRequestedRegions()` (returning `List` from `com.azure.cosmos.models`), and `getRespondedRegions()`. New public types `RequestedRegion` and `RequestedRegionReason` (enum: `INITIAL`, `OPERATION_RETRY`, `TRANSPORT_RETRY`, `HEDGING`, `REGION_FAILOVER`, `CIRCUIT_BREAKER_PROBE`) in `com.azure.cosmos.models`. See [Issue 49182](https://github.com/Azure/azure-sdk-for-java/issues/49182). #### Breaking Changes From 880748d321789a1bb7c241143bc35be76acf2e46 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 15:44:27 -0700 Subject: [PATCH 07/13] ci: retrigger build to clear flaky Spark bridge + Service Bus emulator failures The previous CI run on this branch failed on two checks that are unrelated to this PR's content: * Build Test ubuntu2404_117_FromSource_SkipRebuild_Verify - Scala compiler bridge resolution flake (java.lang.ClassNotFoundException: xsbt.CompilerInterface) while compiling azure-cosmos-spark_4-0_2-13. - The same module's Windows integration test (Spark40Scala213IntegrationTests) passed in the same CI run. - This PR has zero diff against any sdk/cosmos/azure-cosmos-spark_* module. * Build Test ubuntu2404_117_NotFromSource_AggregateReports_SkipRebuild_Verify - com.azure.spring.cloud.docker.compose Service Bus emulator test failure: 'queue.1' entity could not be found in the SB emulator container. - This PR has zero diff against any sdk/spring/ module. Cosmos changes only touch sdk/cosmos/azure-cosmos/. Local unit tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> From 8b8c76233db8fd9e44b87baee855a976087ee019 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 16:31:32 -0700 Subject: [PATCH 08/13] test(cosmos): fix racy reader in compoundAtomicityIsPreservedUnderConcurrentWriters The reader thread issued two separate accessor calls (isHedgingStarted() then getRequestedRegions()) and asserted strict equality between the flag and 'list contains HEDGING'. Each accessor acquires regionLock independently, so a writer can commit its compound-atomic update in the window between the two reads, producing a benign read-skew of flag=false / hasHedge=true. This is not a violation of the M5/M6/M8 compound-atomicity invariant, which is a write-side property. Tighten the check to only count the provably-impossible direction: flag=true && !hasHedge. Because hedgingStarted is monotonic (false -> true, never back), once a reader observes true, any later regions snapshot must include the HEDGING entry that triggered the flip (the writer's lock-release happens-before the next acquisition). Validated locally on Java 22 with 5 consecutive runs (16 writers x 1000 entries each); no failures. Fixes intermittent failure on _18_NotFromSource_TestsOnly Java 8 CI jobs surfaced after the previous retrigger run. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../HedgingDetectionUnitTests.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java index 8adf8566f3ef..02dfed50639a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java @@ -147,8 +147,17 @@ public void snapshotIsDecoupledFromLaterMutations() { @Test(groups = {"unit"}, timeOut = 30_000L) public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws InterruptedException { // 16 writer threads, each appending 1000 HEDGING entries to the same diagnostics. - // Invariant: every reader that observes isHedgingStarted() == true must also see at - // least one HEDGING entry in getRequestedRegions(), and vice-versa. + // + // Writers hold `regionLock` across (list.add + maybe-flip hedgingStarted), so any reader + // that observes hedgingStarted == true and then reads getRequestedRegions() MUST see at + // least one HEDGING entry: hedgingStarted is monotonic (false -> true, never back), so + // the lock-release that flipped the flag happens-before the next snapshot acquisition. + // + // The reverse direction (flag == false, hasHedge == true) is a BENIGN read-skew: the + // reader's two accessor calls release the lock between them, so a writer can commit + // (append HEDGING entry + flip flag) in that window. A retry of the flag read would + // observe true. This is not a violation of M5/M6/M8 compound atomicity, which is a + // write-side invariant. CosmosDiagnostics diagnostics = newDiagnostics(); final int writers = 16; @@ -161,7 +170,10 @@ public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws Interrup boolean flag = diagnostics.isHedgingStarted(); List regions = diagnostics.getRequestedRegions(); boolean hasHedge = regions.stream().anyMatch(r -> r.getReason() == RequestedRegionReason.HEDGING); - if (flag != hasHedge) { + // Only count the provably-impossible direction: flag observed true implies a + // writer's compound-atomic update committed before this read; therefore the + // later regions snapshot must include a HEDGING entry. + if (flag && !hasHedge) { violations.incrementAndGet(); } } @@ -187,7 +199,8 @@ public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws Interrup reader.join(5_000L); assertThat(violations.get()) - .as("isHedgingStarted() and getRequestedRegions() must agree under concurrent writers (M5/M6/M8)") + .as("isHedgingStarted() == true must imply at least one HEDGING entry in " + + "getRequestedRegions() (monotonic flag + compound-atomic write; M5/M6/M8)") .isZero(); assertThat(diagnostics.isHedgingStarted()).isTrue(); assertThat(diagnostics.getRequestedRegions()).hasSize(writers * perWriter); From 9137872bd836d5d6c8255f7e6da51997ef37feb8 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 17:42:41 -0700 Subject: [PATCH 09/13] ci: retrigger to clear flaky SparkE2EBulkWriteITest restart test The only remaining red check is windows2022_Spark40Scala213IntegrationTeststargetingCosmosEmulatorJava21 where SparkE2EBulkWriteITest 'should support bulk ingestion when BulkWriter needs to get restarted with transactional bulk enabled false' asserted '0 did not equal 100' on a 42-second restart-timing test. Same test passed on the previous CI run with this PR's code already in place, and 5 of 6 Spark integration variants (3.3, 3.4, 3.5x2, 4.1) pass in this run. The failing test is BulkWriter restart-timing sensitive and this PR has zero diff against any Spark module. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> From 493a071d2e5f02a6e8e2af797447be824c824d58 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 18:39:34 -0700 Subject: [PATCH 10/13] ci: iteration 4 retrigger - Spark integration flake rotates per run iter 2: windows2022_Spark40Scala213IntegrationTeststargetingCosmosEmulatorJava21 failed on SparkE2EBulkWriteITest 'BulkWriter needs to get restarted' (passed iter 3). iter 3: windows2022_Spark33IntegrationTeststargetingCosmosEmulatorJava8 failed on ChangeFeedPartitionReaderITest 'should honor endLSN during split and should hang' (passed iter 2). A different Spark variant flakes each run on a timing-sensitive test. Same 6-variant matrix, same hedging code, different test fails each time. This PR has zero diff against any Spark module. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> From 501274c20f71ba9544fd93507bb28c0e60b50096 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Mon, 18 May 2026 14:55:04 -0700 Subject: [PATCH 11/13] fix(cosmos): address hedging detection PR review feedback - Resolve TRANSPORT_RETRY region via deterministic 'most recently contacted region' accessor instead of HashSet iterator (#49184 GoneAndRetryWithRetryPolicy review). - Drop misleading 'matches getContactedRegionNames() order' claim from CosmosDiagnosticsContext.getRequestedRegions() Javadoc and clarify that orderings intentionally differ. - Add ClientSideRequestStatistics.getMostRecentlyContactedRegion() and matching bridge accessor on CosmosDiagnosticsAccessor; the bridge impl normalizes the underlying empty-string sentinel to null so callers have a single null-check. - Cover the new accessor with two null-safety unit tests (empty-state -> null; null-diagnostics -> null). Build + style + revapi + unit tests green (Tests run: 19, Failures: 0, Errors: 0, Skipped: 0). Refs PR #49184 review threads: - review-comment #49184/GoneAndRetryWithRetryPolicy - review-comment #49184/CosmosDiagnosticsContext Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../HedgingDetectionUnitTests.java | 12 +++++++++ .../com/azure/cosmos/CosmosDiagnostics.java | 16 ++++++++++++ .../cosmos/CosmosDiagnosticsContext.java | 10 ++++--- .../ClientSideRequestStatistics.java | 8 ++++++ .../ImplementationBridgeHelpers.java | 13 ++++++++++ .../GoneAndRetryWithRetryPolicy.java | 26 +++++++++---------- 6 files changed, 68 insertions(+), 17 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java index 02dfed50639a..bd643ef91949 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/diagnostics/HedgingDetectionUnitTests.java @@ -205,4 +205,16 @@ public void compoundAtomicityIsPreservedUnderConcurrentWriters() throws Interrup assertThat(diagnostics.isHedgingStarted()).isTrue(); assertThat(diagnostics.getRequestedRegions()).hasSize(writers * perWriter); } + + @Test(groups = {"unit"}) + public void getMostRecentlyContactedRegionReturnsNullWhenNoneRecorded() { + CosmosDiagnostics diagnostics = newDiagnostics(); + + assertThat(DIAG_ACCESSOR.getMostRecentlyContactedRegion(diagnostics)).isNull(); + } + + @Test(groups = {"unit"}) + public void getMostRecentlyContactedRegionNullDiagnosticsIsNullSafe() { + assertThat(DIAG_ACCESSOR.getMostRecentlyContactedRegion(null)).isNull(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java index 9c5a04aa9eef..2ae09e1ae184 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java @@ -673,6 +673,22 @@ public boolean isHedgingStartedInternal(CosmosDiagnostics cosmosDiagnostics) { } return cosmosDiagnostics.isHedgingStarted(); } + + @Override + public String getMostRecentlyContactedRegion(CosmosDiagnostics cosmosDiagnostics) { + if (cosmosDiagnostics == null) { + return null; + } + ClientSideRequestStatistics stats = cosmosDiagnostics.clientSideRequestStatistics; + if (stats == null) { + return null; + } + String region = stats.getMostRecentlyContactedRegion(); + if (region == null || region.isEmpty()) { + return null; + } + return region; + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java index c094433f5fad..44ffe21d5562 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnosticsContext.java @@ -464,9 +464,13 @@ public boolean isHedgingStarted() { /** * Returns the regions the SDK dispatched this operation to, aggregated across all - * per-operation {@link CosmosDiagnostics} children in observed order - * (FIFO traversal of the underlying {@code ConcurrentLinkedDeque}, matching the order used - * by {@link #getContactedRegionNames()}). The returned list is unmodifiable. + * per-operation {@link CosmosDiagnostics} children. Aggregation order is FIFO over the + * underlying {@code ConcurrentLinkedDeque} of children; within each child, the requested + * regions are appended in their own FIFO (insertion) order. The returned list is + * unmodifiable. + *

+ * This ordering does not match {@link #getContactedRegionNames()}, which + * returns a sorted set keyed by region name. *

* Each entry reflects an actual SDK dispatch decision (post-threshold-delay, * post-non-cancellation for hedge arms). See diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index 87bb7b02e8bd..0f64c34b072c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -744,6 +744,14 @@ public RegionalRoutingContext getFirstContactedLocationEndpoint() { return this.regionsContactedWithContext.first().locationEndpointsContacted; } + public String getMostRecentlyContactedRegion() { + if (this.regionsContactedWithContext == null || this.regionsContactedWithContext.isEmpty()) { + return StringUtils.EMPTY; + } + + return this.regionsContactedWithContext.last().regionContacted; + } + // ===== Hedging Detection API internal accessors ===== // // appendRequestedRegion / appendRespondedRegion and the matching getters are the *only* diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 80fb110ed76a..860b14047aec 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -919,6 +919,19 @@ void recordAddressResolutionEnd( List getRespondedRegionsInternal(CosmosDiagnostics cosmosDiagnostics); boolean isHedgingStartedInternal(CosmosDiagnostics cosmosDiagnostics); + + /** + * Returns the most-recently-recorded contacted region on the underlying + * {@link ClientSideRequestStatistics}, or {@code null} when no region has been + * recorded. Backed by a synchronized navigable set keyed on insertion timestamp — + * deterministic, no manual iteration, safe to call concurrently with writers. + *

+ * Never returns the empty string — the underlying + * {@code ClientSideRequestStatistics.getMostRecentlyContactedRegion()} sentinel + * ({@link org.apache.commons.lang3.StringUtils#EMPTY}) is normalized to {@code null} + * here so callers have a single null-check at the call site. + */ + String getMostRecentlyContactedRegion(CosmosDiagnostics cosmosDiagnostics); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 7b0d6907c699..4cfa505a6f9b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -97,30 +97,28 @@ private Duration getElapsedTime() { } // Hedging Detection API: best-effort helper that appends a TRANSPORT_RETRY entry to the - // request's diagnostics. The region is the most-recently-contacted region on the request's - // ClientSideRequestStatistics (matching the retry-dispatch semantic). Silently no-ops when - // the request, diagnostics, or region cannot be resolved. NOTE: the PPCB / PPAF probe sites - // also need to surface a TRANSPORT_RETRY / CIRCUIT_BREAKER_PROBE hook — those depend on - // upstream PRs #45197 / #45267 / #46477 / #48421 (SE-005 / SE-012). When those land in the - // Java SDK, wire CIRCUIT_BREAKER_PROBE at the probe-issue site there. + // request's diagnostics. The region is resolved deterministically as the most-recently + // contacted region via the diagnostics bridge — that accessor reads + // `ClientSideRequestStatistics.regionsContactedWithContext.last()`, which is backed by a + // synchronized navigable set keyed on insertion timestamp (no manual iteration, no + // HashSet-iterator nondeterminism, no CME hazard). Silently no-ops when the request, + // diagnostics, or region cannot be resolved. NOTE: the PPCB / PPAF probe sites also need + // to surface a TRANSPORT_RETRY / CIRCUIT_BREAKER_PROBE hook — those depend on upstream PRs + // #45197 / #45267 / #46477 / #48421 (SE-005 / SE-012). When those land in the Java SDK, + // wire CIRCUIT_BREAKER_PROBE at the probe-issue site there. private static void recordTransportRetryRequestedRegion(RxDocumentServiceRequest request) { if (request == null || request.requestContext == null || request.requestContext.cosmosDiagnostics == null) { return; } try { - String regionName = null; CosmosDiagnostics diag = request.requestContext.cosmosDiagnostics; - // Region resolution: fall back to "FirstContactedRegion" if currently-pinned region - // is unknown at this layer. - if (!diag.getContactedRegionNames().isEmpty()) { - regionName = diag.getContactedRegionNames().iterator().next(); - } + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + String regionName = diagAcc.getMostRecentlyContactedRegion(diag); if (regionName == null || regionName.isEmpty()) { return; } - ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagAcc = - ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); diagAcc.appendRequestedRegion( diag, new RequestedRegion(regionName, RequestedRegionReason.TRANSPORT_RETRY)); From 7bf4eb7da1eb7bf64a6043d5d18dbfb4928aeba0 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Tue, 19 May 2026 14:29:18 -0700 Subject: [PATCH 12/13] ci: retrigger to clear cosmos-ci Scala xsbt bridge + libzip SIGBUS flakes Build 6316330 hit the same recurring infrastructure flakes that also fail recent main builds (e.g., 6261886 xsbt.LocalToNonLocalClass: EOFException, 6272523 reactor-core resolver Connection reset): Build and Package, JDK 1.17 (log 916): scala-maven-plugin compile of azure-cosmos-spark_3-4_2-12 -> java.lang.ClassNotFoundException: xsbt.CompilerInterface (Zinc incremental compiler bridge classpath flake; agent .m2 cache state) Build for non-From Source run (log 828): OpenJDK 1.8.0_482 SIGBUS in libzip.so newEntry.isra.4 (native zip access flake) Create APIView if API has changes (log 966): downstream cascade - artifact directory wasn't populated because the build above died None of these touch hedging-detection code; identical patterns recur on main. Pushing an empty commit to rebuild on a fresh agent (matches the canonical retrigger pattern from earlier in this PR -- e.g. 9137872 "ci: retrigger to clear flaky Spark bridge + Service Bus emulator failures"). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> From 579e1bd1a44165191c20454455c34059aa6a7162 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Tue, 19 May 2026 15:24:26 -0700 Subject: [PATCH 13/13] ci: retrigger to clear cosmos-ci timing+Spark encoding flakes Second retrigger. Previous attempt cleared the Scala xsbt bridge + libzip SIGBUS flakes from build 6316330, but build 6323139 surfaced different infra flakes: - ProactiveOpenConnectionsProcessorUnitTest.handleOverflowTest timing race (expected 500, got 279) - PT0.00000001S timeout is machine-speed sensitive - Spark 4.1 Scala 2.13 EXPRESSION_ENCODING_FAILED on _attachments column in upsert test Neither file is touched by this PR. main itself fails cosmos-ci 70% of the time (3/10 recent builds pass), confirming pipeline-level flakiness independent of PR content. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>