Skip to content

Comments

Migrated database layer from mysql client to async aiomysql#249

Open
Vivekgupta008 wants to merge 4 commits intoopenml:mainfrom
Vivekgupta008:vivek/async
Open

Migrated database layer from mysql client to async aiomysql#249
Vivekgupta008 wants to merge 4 commits intoopenml:mainfrom
Vivekgupta008:vivek/async

Conversation

@Vivekgupta008
Copy link

Replace synchronous SQLAlchemy engine and connections with async equivalents
across the entire stack, improving resource utilization under concurrent load.

Changes:

  • pyproject.toml: replace mysqlclient with aiomysql, add pytest-asyncio
  • src/config.toml: update drivername to mysql+aiomysql
  • src/database/setup.py: create_engine -> create_async_engine / AsyncEngine
  • src/database/*.py (6 modules): all functions -> async def, await execute()
  • src/database/users.py: User.groups property -> User.get_groups() async method
  • src/core/access.py: _user_has_access -> async to support await user.get_groups()
  • src/routers/dependencies.py: sync generators -> async generators (AsyncConnection)
  • src/routers/**/*.py (9 routers): all endpoints -> async def, all DB calls awaited
  • tests/conftest.py: fixtures -> pytest_asyncio.fixture with async context managers
  • tests/**/*_test.py (7 files): functions that call DB directly -> async def + await

Closes #229
@PGijsbers please review it

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

Walkthrough

The codebase was migrated from synchronous to asynchronous database operations. The MySQL client driver was replaced with aiomysql and SQLAlchemy async APIs (AsyncEngine/AsyncConnection/create_async_engine) were adopted. Connection types and signatures changed from Connection to AsyncConnection across database modules, routers, and tests; many functions were converted from def to async def and now await database execution. Test fixtures and pytest configuration were updated to use pytest-asyncio. Configuration and dependency lists (pyproject.toml, src/config.toml) were updated to reference the async driver and testing tooling.

Possibly related PRs

  • Update pre-commit hooks #219: Modifies the same functions in src/routers/openml/tasks.py (e.g., fill_template and _fill_json_template); this PR converts them to async while that PR tightens JSON/type annotations.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.90% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: migrating from synchronous mysql client to async aiomysql driver.
Description check ✅ Passed The description provides a comprehensive overview of all major changes across the codebase, covering dependencies, configuration, database layer, routers, and tests.
Linked Issues check ✅ Passed The PR fully implements the requirements from issue #229: replaces the synchronous database driver with asyncio-compatible aiomysql via SQLAlchemy dialect, and migrates all database interactions to async APIs.
Out of Scope Changes check ✅ Passed All changes are directly related to the async migration objective. Changes to dependencies, configuration, database functions, routers, and tests are all in-scope for converting to async operations.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • Tests that patch async database functions with AsyncMock (e.g. in tests/routers/openml/flows_test.py) should import AsyncMock from unittest.mock, otherwise those tests will fail at runtime.
  • In routers/openml/flows.get_flow, the recursive await get_flow(...) inside the list comprehension for subflows results in one query per subflow executed sequentially; if studies commonly have many subflows, consider batching or using asyncio.gather to fetch subflows concurrently to avoid N+1-style latency.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Tests that patch async database functions with `AsyncMock` (e.g. in `tests/routers/openml/flows_test.py`) should import `AsyncMock` from `unittest.mock`, otherwise those tests will fail at runtime.
- In `routers/openml/flows.get_flow`, the recursive `await get_flow(...)` inside the list comprehension for `subflows` results in one query per subflow executed sequentially; if studies commonly have many subflows, consider batching or using `asyncio.gather` to fetch subflows concurrently to avoid N+1-style latency.

## Individual Comments

### Comment 1
<location> `src/database/flows.py:8-11` </location>
<code_context>


-def get_subflows(for_flow: int, expdb: Connection) -> Sequence[Row]:
+async def get_subflows(for_flow: int, expdb: AsyncConnection) -> Sequence[Row]:
     return cast(
         "Sequence[Row]",
-        expdb.execute(
+        await expdb.execute(
             text(
                 """
</code_context>

<issue_to_address>
**issue (bug_risk):** Returning a `Result` instead of a list changes the effective return type and may break callers expecting a `Sequence[Row]`.

The function used to return `expdb.execute(...).all()` (a concrete list), but now returns the `Result` from `await expdb.execute(...)` while still being annotated as `Sequence[Row]`.

`Result` is iterable but not a `Sequence` (no `len`, indexing, etc.) and is single-use. Existing callers like `get_flow` that only iterate will work, but any list-like usage will now break.

To keep the original semantics and match the type hint, you can restore `.all()`:

```python
result = await expdb.execute(text(""" ... """), parameters={"parent": for_flow})
return cast("Sequence[Row]", result.all())
```

The same issue exists in `get_parameters`, which also previously returned `.all()`.
</issue_to_address>

### Comment 2
<location> `tests/routers/openml/flows_test.py:29` </location>
<code_context>
 ) -> None:
-    mocked_db = mocker.patch("database.flows.get_by_name")
-    flow_exists(name, external_version, expdb_test)
+    mocked_db = mocker.patch("database.flows.get_by_name", new_callable=AsyncMock)
+    await flow_exists(name, external_version, expdb_test)
     mocked_db.assert_called_once_with(
</code_context>

<issue_to_address>
**issue (testing):** AsyncMock is used but not imported, which will cause the tests in this module to fail.

`AsyncMock` is referenced in this test file (e.g. for `database.flows.get_by_name`) but never imported. Add `from unittest.mock import AsyncMock` (or equivalent) at the top of `tests/routers/openml/flows_test.py` to avoid `NameError` at test runtime.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/database/datasets.py (1)

105-117: ⚠️ Potential issue | 🟠 Major

one_or_none() will raise if multiple processing records exist.

get_latest_processing_update orders by processing_date DESC but uses .one_or_none(), which raises MultipleResultsFound if more than one row matches. The analogous functions get_description (Line 86) and get_status (Line 102) correctly use .first() for the same "get latest" pattern.

🐛 Suggested fix
 async def get_latest_processing_update(dataset_id: int, connection: AsyncConnection) -> Row | None:
     row = await connection.execute(
         text(
             """
     SELECT *
     FROM data_processed
     WHERE did = :dataset_id
     ORDER BY processing_date DESC
     """,
         ),
         parameters={"dataset_id": dataset_id},
     )
-    return row.one_or_none()
+    return row.first()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/datasets.py` around lines 105 - 117, The query in
get_latest_processing_update currently uses row.one_or_none(), which will raise
MultipleResultsFound if more than one processing record exists; change it to use
row.first() (or add "LIMIT 1" to the SQL and keep one_or_none()) so the function
returns the latest row without raising—update the call site in
get_latest_processing_update to call .first() on the result (or add LIMIT 1 in
the SELECT) to match the behavior of get_description/get_status.
src/database/flows.py (2)

38-51: ⚠️ Potential issue | 🟠 Major

Same issue: get_parameters is missing .all().

Same problem as get_subflows — the CursorResult is cast to Sequence[Row] without materialising the rows.

Proposed fix
 async def get_parameters(flow_id: int, expdb: AsyncConnection) -> Sequence[Row]:
     return cast(
         "Sequence[Row]",
-        await expdb.execute(
+        (await expdb.execute(
             text(
                 """
             SELECT *, defaultValue as default_value, dataType as data_type
             FROM input
             WHERE implementation_id = :flow_id
             """,
             ),
             parameters={"flow_id": flow_id},
-        ),
+        )).all(),
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/flows.py` around lines 38 - 51, The get_parameters function
currently casts the AsyncConnection.execute return (a CursorResult) directly to
Sequence[Row] without materializing rows; call .all() on the result of
expdb.execute before casting/returning so the function returns an actual
Sequence[Row] (similar to the fix in get_subflows), i.e., update get_parameters
to await expdb.execute(...).all() and then cast that value to "Sequence[Row]".

8-21: ⚠️ Potential issue | 🟠 Major

Missing .all() on get_subflows — inconsistent with other async DB functions and potentially buggy.

await expdb.execute(...) returns a CursorResult, not a Sequence[Row]. You cast it directly, but CursorResult is a forward-only cursor — if it's iterated more than once by the caller, the second iteration silently yields nothing. The sibling functions in src/database/studies.py (get_study_data) and src/database/tasks.py (get_input_for_task) call .all() to materialise the rows before returning.

Proposed fix
 async def get_subflows(for_flow: int, expdb: AsyncConnection) -> Sequence[Row]:
     return cast(
         "Sequence[Row]",
-        await expdb.execute(
+        (await expdb.execute(
             text(
                 """
             SELECT child as child_id, identifier
             FROM implementation_component
             WHERE parent = :flow_id
             """,
             ),
             parameters={"flow_id": for_flow},
-        ),
+        )).all(),
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/flows.py` around lines 8 - 21, The function get_subflows
currently returns the CursorResult from await expdb.execute(...) which is a
forward-only cursor and can be exhausted; change it to materialize and return a
list of rows by calling .all() on the result before casting/returning (mirror
how get_study_data and get_input_for_task do it) so callers receive a
Sequence[Row] (reference: get_subflows, the expdb.execute(...) call).
src/database/studies.py (1)

160-171: ⚠️ Potential issue | 🟡 Minor

Improve exception message extraction for robustness with aiomysql.

The unpacking (msg,) = e.args assumes exactly one element and will fail if the exception structure changes. While tests currently pass with aiomysql, this is fragile. Use a defensive approach instead:

Suggested fix
     except Exception as e:
-        (msg,) = e.args
+        msg = str(e.args[0]) if e.args else str(e)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/studies.py` around lines 160 - 171, The current except block in
studies.py unpacks the exception args with (msg,) = e.args which will crash if
e.args has zero or multiple elements; change it to robustly extract the message
(e.g. msg = str(e) or msg = e.args[0] if e.args else str(e)) and then run the
same pattern matching logic against that msg (the existing regex for "Duplicate
entry '(\d+)-(\d+)'" and string checks for foreign key / out-of-range). Keep
raising ValueError(msg) from e for the original exception; update only the
message-extraction portion (the variable e and subsequent matches remain the
same).
🧹 Nitpick comments (10)
src/routers/mldcat_ap/dataset.py (1)

141-160: Unhandled StopIteration if quality name doesn't match.

Line 148: next(q for q in qualities if q.name == quality_name) raises StopIteration (500 error) if no quality matches. This is pre-existing, but since you're touching this function, consider providing a default or a proper HTTP 404:

🛡️ Suggested improvement
-    quality = next(q for q in qualities if q.name == quality_name)
+    quality = next((q for q in qualities if q.name == quality_name), None)
+    if quality is None:
+        raise HTTPException(status_code=404, detail="Quality not found for this distribution.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/mldcat_ap/dataset.py` around lines 141 - 160, The current
get_distribution_quality uses next(q for q in qualities if q.name ==
quality_name) which raises StopIteration if no match; change this to a safe
lookup (e.g., found = next((q for q in qualities if q.name == quality_name),
None)) and if found is None raise an HTTP 404 (raise
HTTPException(status_code=404, detail=f"Quality '{quality_name}' not found for
distribution {distribution_id}")) before constructing the Quality and
JsonLDGraph; update imports if needed to include HTTPException from fastapi.
src/database/users.py (2)

72-76: Redundant UserGroup wrapping in get_groups.

get_user_groups_for already returns list[UserGroup], so re-wrapping each element with UserGroup(group_id) on Line 75 is a no-op (IntEnum constructor is idempotent on its own members). You could simplify:

🧹 Suggested simplification
     async def get_groups(self) -> list[UserGroup]:
         if self._groups is None:
-            groups = await get_user_groups_for(user_id=self.user_id, connection=self._database)
-            self._groups = [UserGroup(group_id) for group_id in groups]
+            self._groups = await get_user_groups_for(user_id=self.user_id, connection=self._database)
         return self._groups
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/users.py` around lines 72 - 76, The get_groups method is
re-wrapping values that are already UserGroup instances: replace the list
comprehension that does [UserGroup(group_id) for group_id in groups] with
directly assigning groups (from get_user_groups_for) to self._groups; update
get_groups to set self._groups = groups (keeping the call to
get_user_groups_for(user_id=self.user_id, connection=self._database)) so you
avoid the redundant UserGroup(...) construction and preserve the cached
self._groups.

46-57: Misleading variable name row for a result set.

Line 47 assigns the full CursorResult to row, but it's iterated as a collection of rows on Line 57. Consider renaming to rows for clarity.

🧹 Suggested rename
 async def get_user_groups_for(*, user_id: int, connection: AsyncConnection) -> list[UserGroup]:
-    row = await connection.execute(
+    rows = await connection.execute(
         text(
             """
     SELECT group_id
     FROM users_groups
     WHERE user_id = :user_id
     """,
         ),
         parameters={"user_id": user_id},
     )
-    return [UserGroup(group) for (group,) in row]
+    return [UserGroup(group) for (group,) in rows]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/users.py` around lines 46 - 57, In get_user_groups_for, the
variable named row is actually a CursorResult/collection and is iterated later;
rename row to rows (or results) to avoid confusion and update the list
comprehension accordingly (e.g., iterate over rows and construct UserGroup from
each row tuple) so the function get_user_groups_for clearly reflects that it
processes multiple result rows.
src/database/setup.py (1)

21-25: Unused noqa directive on Line 22.

Ruff reports that the # noqa: PLW0603 directive on Line 22 is unused (rule RUF100). The same likely applies to Line 29. If the PLW0603 rule isn't enabled in your Ruff config, these directives are dead annotations.

🧹 Suggested cleanup
 def user_database() -> AsyncEngine:
-    global _user_engine  # noqa: PLW0603
+    global _user_engine
     if _user_engine is None:
         _user_engine = _create_engine("openml")
     return _user_engine
 def expdb_database() -> AsyncEngine:
-    global _expdb_engine  # noqa: PLW0603
+    global _expdb_engine
     if _expdb_engine is None:
         _expdb_engine = _create_engine("expdb")
     return _expdb_engine
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/setup.py` around lines 21 - 25, The file contains unused noqa
directives suppressing PLW0603; remove the dead "# noqa: PLW0603" comment from
the user_database() function (and the analogous unused noqa in the other engine
factory function that assigns to _user_engine/_api_engine) so Ruff no longer
reports RUF100; leave the global assignment and engine creation logic (e.g.,
_user_engine and _create_engine("openml")) unchanged.
pyproject.toml (1)

81-82: @pytest.mark.asyncio decorators across test files are redundant and can be removed.

In auto mode, pytest-asyncio automatically adds the asyncio marker to all async test functions — @pytest.mark.asyncio "can be omitted entirely in auto mode."

The decorators appear on every converted async test in users_test.py, dataset_tag_test.py, study_test.py, flows_test.py, and qualities_test.py. They are harmless but noisy; removing them would make the intent of asyncio_mode = "auto" self-evident.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pyproject.toml` around lines 81 - 82, The pytest configuration sets
asyncio_mode = "auto", so remove the now-redundant `@pytest.mark.asyncio`
decorators from async test functions across the test files (users_test.py,
dataset_tag_test.py, study_test.py, flows_test.py, qualities_test.py); locate
occurrences of the `@pytest.mark.asyncio` decorator lines in those files and
delete them (leaving the async def test_* functions and any fixture usage
intact) so the tests rely on pytest-asyncio's automatic marking.
src/routers/openml/flows.py (1)

57-63: Sequential await in list comprehension — consider asyncio.gather for concurrent subflow fetching.

Each subflow is fetched sequentially. If a flow has many subflows, this could be noticeably slower than necessary. Since you're now on async, you could leverage concurrency:

♻️ Optional: concurrent subflow fetching
+    import asyncio
+    subflow_data = await asyncio.gather(
+        *(get_flow(flow_id=subflow.child_id, expdb=expdb) for subflow in subflow_rows)
+    )
     subflows = [
         Subflow(
             identifier=subflow.identifier,
-            flow=await get_flow(flow_id=subflow.child_id, expdb=expdb),
+            flow=flow,
         )
-        for subflow in subflow_rows
+        for subflow, flow in zip(subflow_rows, subflow_data)
     ]

Note: This assumes the AsyncConnection is safe to use concurrently for read-only queries. If the connection doesn't support concurrent use (SQLAlchemy AsyncConnection typically doesn't), the sequential approach is correct and this should be disregarded.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 57 - 63, The current list
comprehension builds subflows by awaiting get_flow sequentially, causing slow
serial fetches; change to run get_flow concurrently (e.g., create tasks for each
subflow.child_id from subflow_rows and await asyncio.gather on them) and then
map results into Subflow(identifier=subflow.identifier, flow=flow) to produce
the subflows list; ensure you reference the existing get_flow function, Subflow
class, subflow_rows iterable, and the subflows variable when implementing the
concurrent gather approach, but only do this if your AsyncConnection supports
concurrent reads.
src/routers/openml/study.py (1)

120-128: Sequential single-row inserts in a loop — consider using the bulk attach_tasks instead.

create_study uses attach_task (single) in a loop, while attach_to_study uses attach_tasks (bulk). For consistency and fewer round-trips, consider using the bulk variant here too. This is a pre-existing pattern, so fine to defer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/study.py` around lines 120 - 128, Replace the per-item
loop that calls database.studies.attach_task for StudyType.TASK with the bulk
API database.studies.attach_tasks by passing the list study.tasks and the
created study_id (keep user and expdb), i.e., after calling
database.studies.create(store the returned study_id) call
database.studies.attach_tasks(task_ids=study.tasks, study_id=study_id,
user=user, expdb=expdb); also verify whether a bulk attach_runs exists and apply
the same change for the StudyType.RUN branch (replace the attach_run loop with
attach_runs if available) to reduce round-trips.
src/routers/dependencies.py (1)

11-22: Async connection dependencies correctly migrated.

The async with engine.connect() pattern and await connection.commit() after yield are correct for SQLAlchemy async usage with FastAPI dependency injection.

One note: the unconditional commit() after yield runs even if the endpoint raises an exception, which could commit partial writes for mutating endpoints. This is pre-existing behavior, but worth considering a try/except guard in a follow-up:

💡 Optional: guard commit on exception
 async def expdb_connection() -> AsyncConnection:
     engine = expdb_database()
     async with engine.connect() as connection:
-        yield connection
-        await connection.commit()
+        try:
+            yield connection
+        except Exception:
+            await connection.rollback()
+            raise
+        else:
+            await connection.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/dependencies.py` around lines 11 - 22, The current dependency
helpers expdb_connection and userdb_connection unconditionally call await
connection.commit() after yield which may commit even when the request raised an
exception; change each to wrap the yield in try/except/finally so that you
commit only on successful exit and call await connection.rollback() in the
except block (or re-raise after rollback) to avoid committing partial writes,
ensuring the same AsyncConnection is yielded and cleaned up correctly.
src/routers/openml/datasets.py (1)

79-79: Remove unused noqa directive.

Ruff reports PLR0913 is non-enabled, making the # noqa: PLR0913 directive on this line unnecessary.

-async def list_datasets(  # noqa: PLR0913
+async def list_datasets(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/datasets.py` at line 79, The function declaration for
list_datasets contains an unnecessary "# noqa: PLR0913" directive; remove that
trailing noqa comment from the async def list_datasets(...) signature so the
linter no longer sees an unused suppression — simply delete " # noqa: PLR0913"
from the line defining list_datasets.
tests/conftest.py (1)

33-36: Return type annotations should be AsyncIterator[AsyncConnection] for yielding fixtures.

These fixtures use yield, making them async generators, but the return type is annotated as AsyncConnection. While pytest/pytest-asyncio handles this fine at runtime, the type annotations are technically incorrect. AsyncIterator[AsyncConnection] (already imported on line 3) would be accurate.

Proposed fix
 `@pytest_asyncio.fixture`
-async def expdb_test() -> AsyncConnection:
+async def expdb_test() -> AsyncIterator[AsyncConnection]:
     async with automatic_rollback(expdb_database()) as connection:
         yield connection


 `@pytest_asyncio.fixture`
-async def user_test() -> AsyncConnection:
+async def user_test() -> AsyncIterator[AsyncConnection]:
     async with automatic_rollback(user_database()) as connection:
         yield connection

Also applies to: 39-42

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/conftest.py` around lines 33 - 36, Change the async fixture return-type
annotation from AsyncConnection to AsyncIterator[AsyncConnection] for the
yielding fixtures (e.g., expdb_test) so the annotation matches the async
generator behavior; update expdb_test's signature to return
AsyncIterator[AsyncConnection] and apply the same fix to the other yielding
fixture(s) in this file (AsyncIterator is already imported).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/routers/openml/study_test.py`:
- Around line 507-520: The helper _attach_tasks_to_study currently hardcodes the
study id in the SQL update ("UPDATE study SET status = 'in_preparation' WHERE id
= 1"), which doesn't use the study_id parameter; change that UPDATE to use the
study_id parameter so the function sets the correct study to in_preparation
before calling TestClient.post, and then remove redundant caller-side UPDATE
statements (e.g., in tests that manually set study 7) since the helper now
handles the correct study. Ensure you update the SQL execution call in
_attach_tasks_to_study and adjust any callers that were performing the same
update.

---

Outside diff comments:
In `@src/database/datasets.py`:
- Around line 105-117: The query in get_latest_processing_update currently uses
row.one_or_none(), which will raise MultipleResultsFound if more than one
processing record exists; change it to use row.first() (or add "LIMIT 1" to the
SQL and keep one_or_none()) so the function returns the latest row without
raising—update the call site in get_latest_processing_update to call .first() on
the result (or add LIMIT 1 in the SELECT) to match the behavior of
get_description/get_status.

In `@src/database/flows.py`:
- Around line 38-51: The get_parameters function currently casts the
AsyncConnection.execute return (a CursorResult) directly to Sequence[Row]
without materializing rows; call .all() on the result of expdb.execute before
casting/returning so the function returns an actual Sequence[Row] (similar to
the fix in get_subflows), i.e., update get_parameters to await
expdb.execute(...).all() and then cast that value to "Sequence[Row]".
- Around line 8-21: The function get_subflows currently returns the CursorResult
from await expdb.execute(...) which is a forward-only cursor and can be
exhausted; change it to materialize and return a list of rows by calling .all()
on the result before casting/returning (mirror how get_study_data and
get_input_for_task do it) so callers receive a Sequence[Row] (reference:
get_subflows, the expdb.execute(...) call).

In `@src/database/studies.py`:
- Around line 160-171: The current except block in studies.py unpacks the
exception args with (msg,) = e.args which will crash if e.args has zero or
multiple elements; change it to robustly extract the message (e.g. msg = str(e)
or msg = e.args[0] if e.args else str(e)) and then run the same pattern matching
logic against that msg (the existing regex for "Duplicate entry '(\d+)-(\d+)'"
and string checks for foreign key / out-of-range). Keep raising ValueError(msg)
from e for the original exception; update only the message-extraction portion
(the variable e and subsequent matches remain the same).

---

Nitpick comments:
In `@pyproject.toml`:
- Around line 81-82: The pytest configuration sets asyncio_mode = "auto", so
remove the now-redundant `@pytest.mark.asyncio` decorators from async test
functions across the test files (users_test.py, dataset_tag_test.py,
study_test.py, flows_test.py, qualities_test.py); locate occurrences of the
`@pytest.mark.asyncio` decorator lines in those files and delete them (leaving the
async def test_* functions and any fixture usage intact) so the tests rely on
pytest-asyncio's automatic marking.

In `@src/database/setup.py`:
- Around line 21-25: The file contains unused noqa directives suppressing
PLW0603; remove the dead "# noqa: PLW0603" comment from the user_database()
function (and the analogous unused noqa in the other engine factory function
that assigns to _user_engine/_api_engine) so Ruff no longer reports RUF100;
leave the global assignment and engine creation logic (e.g., _user_engine and
_create_engine("openml")) unchanged.

In `@src/database/users.py`:
- Around line 72-76: The get_groups method is re-wrapping values that are
already UserGroup instances: replace the list comprehension that does
[UserGroup(group_id) for group_id in groups] with directly assigning groups
(from get_user_groups_for) to self._groups; update get_groups to set
self._groups = groups (keeping the call to
get_user_groups_for(user_id=self.user_id, connection=self._database)) so you
avoid the redundant UserGroup(...) construction and preserve the cached
self._groups.
- Around line 46-57: In get_user_groups_for, the variable named row is actually
a CursorResult/collection and is iterated later; rename row to rows (or results)
to avoid confusion and update the list comprehension accordingly (e.g., iterate
over rows and construct UserGroup from each row tuple) so the function
get_user_groups_for clearly reflects that it processes multiple result rows.

In `@src/routers/dependencies.py`:
- Around line 11-22: The current dependency helpers expdb_connection and
userdb_connection unconditionally call await connection.commit() after yield
which may commit even when the request raised an exception; change each to wrap
the yield in try/except/finally so that you commit only on successful exit and
call await connection.rollback() in the except block (or re-raise after
rollback) to avoid committing partial writes, ensuring the same AsyncConnection
is yielded and cleaned up correctly.

In `@src/routers/mldcat_ap/dataset.py`:
- Around line 141-160: The current get_distribution_quality uses next(q for q in
qualities if q.name == quality_name) which raises StopIteration if no match;
change this to a safe lookup (e.g., found = next((q for q in qualities if q.name
== quality_name), None)) and if found is None raise an HTTP 404 (raise
HTTPException(status_code=404, detail=f"Quality '{quality_name}' not found for
distribution {distribution_id}")) before constructing the Quality and
JsonLDGraph; update imports if needed to include HTTPException from fastapi.

In `@src/routers/openml/datasets.py`:
- Line 79: The function declaration for list_datasets contains an unnecessary "#
noqa: PLR0913" directive; remove that trailing noqa comment from the async def
list_datasets(...) signature so the linter no longer sees an unused suppression
— simply delete " # noqa: PLR0913" from the line defining list_datasets.

In `@src/routers/openml/flows.py`:
- Around line 57-63: The current list comprehension builds subflows by awaiting
get_flow sequentially, causing slow serial fetches; change to run get_flow
concurrently (e.g., create tasks for each subflow.child_id from subflow_rows and
await asyncio.gather on them) and then map results into
Subflow(identifier=subflow.identifier, flow=flow) to produce the subflows list;
ensure you reference the existing get_flow function, Subflow class, subflow_rows
iterable, and the subflows variable when implementing the concurrent gather
approach, but only do this if your AsyncConnection supports concurrent reads.

In `@src/routers/openml/study.py`:
- Around line 120-128: Replace the per-item loop that calls
database.studies.attach_task for StudyType.TASK with the bulk API
database.studies.attach_tasks by passing the list study.tasks and the created
study_id (keep user and expdb), i.e., after calling
database.studies.create(store the returned study_id) call
database.studies.attach_tasks(task_ids=study.tasks, study_id=study_id,
user=user, expdb=expdb); also verify whether a bulk attach_runs exists and apply
the same change for the StudyType.RUN branch (replace the attach_run loop with
attach_runs if available) to reduce round-trips.

In `@tests/conftest.py`:
- Around line 33-36: Change the async fixture return-type annotation from
AsyncConnection to AsyncIterator[AsyncConnection] for the yielding fixtures
(e.g., expdb_test) so the annotation matches the async generator behavior;
update expdb_test's signature to return AsyncIterator[AsyncConnection] and apply
the same fix to the other yielding fixture(s) in this file (AsyncIterator is
already imported).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/database/datasets.py (1)

105-117: ⚠️ Potential issue | 🟠 Major

Change one_or_none() to .first() for consistency and correctness.

The query selects all processing records for a dataset ordered by processing_date DESC without a LIMIT. If a dataset has multiple processing records, one_or_none() will raise MultipleResultsFound, crashing the endpoint. Both sibling functions get_description (line 70) and get_status (line 89) correctly use .first() for the identical "get latest" pattern. The call sites at lines 258 and 307 in src/routers/openml/datasets.py check for None but don't handle the exception, confirming that .first() semantics are intended.

Proposed fix
-    return row.one_or_none()
+    return row.first()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/datasets.py` around lines 105 - 117, The
get_latest_processing_update function currently calls row.one_or_none() which
can raise MultipleResultsFound when the query returns multiple rows; change this
to row.first() so it returns the first result or None without raising. Locate
the async function get_latest_processing_update (using symbols
get_latest_processing_update, data_processed, did) and replace the final return
row.one_or_none() with return row.first(), keeping the existing query and
parameters unchanged to match the sibling functions get_description and
get_status.
src/routers/openml/study.py (2)

83-92: ⚠️ Potential issue | 🟠 Major

attach_runs raises NotImplementedError, which is silently unhandled.

database.studies.attach_runs is an unimplemented stub (raise NotImplementedError). The surrounding try/except ValueError does not catch NotImplementedError, so any POST /studies/attach request against a RUN-type study propagates as an unhandled exception → HTTP 500. Either guard the branch with a 501 Not Implemented response, or widen the except clause.

🛡️ Option A — block the unimplemented path explicitly
     if study.type_ == StudyType.TASK:
         await database.studies.attach_tasks(task_ids=entity_ids, **attach_kwargs)
     else:
-        await database.studies.attach_runs(run_ids=entity_ids, **attach_kwargs)
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_IMPLEMENTED,
+            detail="Attaching runs to studies is not yet supported.",
+        )
🛡️ Option B — catch NotImplementedError alongside ValueError
-    except ValueError as e:
+    except (ValueError, NotImplementedError) as e:
         raise HTTPException(
-            status_code=HTTPStatus.CONFLICT,
+            status_code=HTTPStatus.CONFLICT if isinstance(e, ValueError) else HTTPStatus.NOT_IMPLEMENTED,
             detail=str(e),
         ) from None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/study.py` around lines 83 - 92, The POST /studies/attach
branch currently calls database.studies.attach_runs (an unimplemented stub that
raises NotImplementedError) and only catches ValueError, causing a 500; fix by
either (A) explicitly guarding the run-path: when study.type_ != StudyType.TASK
raise HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail="attach_runs
not implemented") before calling attach_runs, or (B) expand the except to catch
NotImplementedError alongside ValueError and map it to
HTTPStatus.NOT_IMPLEMENTED (raise
HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail=str(e))). Update
the code around the existing try/except that references attach_tasks,
attach_runs, StudyType.TASK, HTTPException and HTTPStatus.CONFLICT to implement
one of these two fixes.

83-92: ⚠️ Potential issue | 🟠 Major

attach_runs raises NotImplementedError, which is not caught by except ValueError.

database.studies.attach_runs is stub-only (raise NotImplementedError). Calling attach_to_study for a run-type study will always propagate an unhandled NotImplementedError → HTTP 500, bypassing the user-facing HTTPException(CONFLICT, ...) error path. Either catch NotImplementedError and return a 501 Not Implemented response, or block the code path at the route level until the function is implemented.

🛡️ Suggested guard
     if study.type_ == StudyType.TASK:
         await database.studies.attach_tasks(task_ids=entity_ids, **attach_kwargs)
     else:
-        await database.studies.attach_runs(run_ids=entity_ids, **attach_kwargs)
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_IMPLEMENTED,
+            detail="Attaching runs to studies is not yet supported.",
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/study.py` around lines 83 - 92, attach_runs (called from
attach_to_study) can raise NotImplementedError which is not caught, causing HTTP
500; update attach_to_study to handle this by catching NotImplementedError from
database.studies.attach_runs and raising
HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail="attach_runs not
implemented") or alternatively short-circuit the route when study.type_ !=
StudyType.TASK (e.g., validate and return 501 before calling
database.studies.attach_runs). Locate the try/except around
database.studies.attach_tasks / attach_runs and add the NotImplementedError
branch (or a pre-check on study.type_) to ensure unimplemented stubs produce a
501 Not Implemented response instead of an internal server error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/routers/openml/datasets.py`:
- Line 79: The function definition for list_datasets contains an unnecessary
suppression comment "# noqa: PLR0913"; remove that trailing noqa directive from
the async def line (async def list_datasets(...)) so the linter warning is not
being suppressed unnecessarily and re-run the linter to confirm the directive is
gone.

In `@src/routers/openml/study.py`:
- Around line 6-7: The import of Row is using the wrong module; change the
import so Row comes from sqlalchemy.engine instead of the top-level package.
Update the import statement that currently references Row (near the existing
AsyncConnection import) to import Row from sqlalchemy.engine (i.e., adjust the
import that mentions Row so the symbol Row is imported from sqlalchemy.engine
rather than sqlalchemy).

---

Outside diff comments:
In `@src/database/datasets.py`:
- Around line 105-117: The get_latest_processing_update function currently calls
row.one_or_none() which can raise MultipleResultsFound when the query returns
multiple rows; change this to row.first() so it returns the first result or None
without raising. Locate the async function get_latest_processing_update (using
symbols get_latest_processing_update, data_processed, did) and replace the final
return row.one_or_none() with return row.first(), keeping the existing query and
parameters unchanged to match the sibling functions get_description and
get_status.

In `@src/routers/openml/study.py`:
- Around line 83-92: The POST /studies/attach branch currently calls
database.studies.attach_runs (an unimplemented stub that raises
NotImplementedError) and only catches ValueError, causing a 500; fix by either
(A) explicitly guarding the run-path: when study.type_ != StudyType.TASK raise
HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail="attach_runs not
implemented") before calling attach_runs, or (B) expand the except to catch
NotImplementedError alongside ValueError and map it to
HTTPStatus.NOT_IMPLEMENTED (raise
HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail=str(e))). Update
the code around the existing try/except that references attach_tasks,
attach_runs, StudyType.TASK, HTTPException and HTTPStatus.CONFLICT to implement
one of these two fixes.
- Around line 83-92: attach_runs (called from attach_to_study) can raise
NotImplementedError which is not caught, causing HTTP 500; update
attach_to_study to handle this by catching NotImplementedError from
database.studies.attach_runs and raising
HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED, detail="attach_runs not
implemented") or alternatively short-circuit the route when study.type_ !=
StudyType.TASK (e.g., validate and return 501 before calling
database.studies.attach_runs). Locate the try/except around
database.studies.attach_tasks / attach_runs and add the NotImplementedError
branch (or a pre-check on study.type_) to ensure unimplemented stubs produce a
501 Not Implemented response instead of an internal server error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use asyncio

1 participant