Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#### Bugs Fixed
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
* Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937)
* Fixed a bug in `query_items(feed_range=...)` where pagination could return incorrect results after a partition split caused the supplied feed range to overlap multiple physical partitions. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)
* Fixed bug where `SELECT VALUE AVG(...)` queries spanning multiple physical partitions returned mathematically incorrect merged values from client-side aggregation. These queries now raise `ValueError`. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)

#### Other Changes
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)
Expand Down
95 changes: 53 additions & 42 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
from . import documents
from . import http_constants
from . import _runtime_constants
from ._query_aggregate_utils import (
_AggregatePartialClassification,
_classify_aggregate_partial,
_get_select_value_aggregate_function,
)
from ._constants import _Constants as Constants
from .auth import _get_authorization_header
from .offer import ThroughputProperties
Expand Down Expand Up @@ -129,6 +134,7 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': if_none_match}
return options


def _merge_query_results(
results: dict[str, Any],
partial_result: dict[str, Any],
Expand Down Expand Up @@ -168,22 +174,13 @@ def _merge_query_results(

results_docs = results.get("Documents")

# Check if both results are aggregate queries
is_partial_agg = (
isinstance(partial_docs, list)
and len(partial_docs) == 1
and isinstance(partial_docs[0], dict)
and partial_docs[0].get("_aggregate") is not None
)
is_results_agg = (
results_docs
and isinstance(results_docs, list)
and len(results_docs) == 1
and isinstance(results_docs[0], dict)
and results_docs[0].get("_aggregate") is not None
)
partial_aggregate_class = _classify_aggregate_partial(partial_docs, query)
results_aggregate_class = _classify_aggregate_partial(results_docs, query)

if is_partial_agg and is_results_agg:
if (
partial_aggregate_class == _AggregatePartialClassification.OBJECT
and results_aggregate_class == _AggregatePartialClassification.OBJECT
):
agg_results = results_docs[0]["_aggregate"] # type: ignore[index]
agg_partial = partial_docs[0]["_aggregate"]
for key in agg_partial:
Expand All @@ -201,33 +198,26 @@ def _merge_query_results(
agg_results[key] += agg_partial[key]
return results

# Check if both are VALUE aggregate queries
is_partial_value_agg = (
isinstance(partial_docs, list)
and len(partial_docs) == 1
and isinstance(partial_docs[0], (int, float))
)
is_results_value_agg = (
results_docs
and isinstance(results_docs, list)
and len(results_docs) == 1
and isinstance(results_docs[0], (int, float))
)

if is_partial_value_agg and is_results_value_agg:
query_text = query.get("query") if isinstance(query, dict) else query
if query_text:
query_upper = query_text.upper()
# For MIN/MAX, we find the min/max of the partial results.
# For COUNT/SUM, we sum the partial results.
# Without robust query parsing, we can't distinguish them reliably.
# Defaulting to sum for COUNT/SUM. MIN/MAX VALUE queries are not fully supported client-side.
if " SELECT VALUE MIN" in query_upper:
results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index]
elif " SELECT VALUE MAX" in query_upper:
results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index]
else: # For COUNT/SUM, we sum the partial results
results_docs[0] += partial_docs[0] # type: ignore[index]
if (
partial_aggregate_class == _AggregatePartialClassification.VALUE
and results_aggregate_class == _AggregatePartialClassification.VALUE
):
aggregate_fn = _get_select_value_aggregate_function(query)
if aggregate_fn is None:
raise ValueError(
"Invariant violation: VALUE aggregate classification requires a recognized aggregate function."
)
if aggregate_fn == "MIN":
results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index]
elif aggregate_fn == "MAX":
results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index]
elif aggregate_fn == "AVG":
raise ValueError(
"VALUE AVG aggregate merge across partitions is not supported client-side."
)
else:
# COUNT/SUM are additive.
results_docs[0] += partial_docs[0] # type: ignore[index]
return results

# Standard query, append documents
Expand All @@ -239,6 +229,27 @@ def _merge_query_results(
return results


def _raise_query_merge_value_error(merge_error: ValueError) -> None:
"""Raise a clearer user-facing error for unsupported VALUE aggregate merges.

``SELECT VALUE AVG(...)`` partials cannot be merged correctly client-side
across multiple partition/range responses. We fail loudly instead of
falling back to list concatenation (which would silently produce
mathematically incorrect results).

:param merge_error: ValueError raised while merging partial query results.
:type merge_error: ValueError
:raises ValueError: Always re-raises, potentially with a clearer message.
"""
merge_message = str(merge_error)
if "VALUE AVG aggregate merge across partitions is not supported client-side." in merge_message:
raise ValueError(
"Unsupported query shape for range-scoped pagination: "
"SELECT VALUE AVG(...) cannot be merged client-side when the query "
"scope spans multiple physical partitions."
) from merge_error
raise merge_error

def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],
default_headers: Mapping[str, Any],
Expand Down
Loading
Loading