Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ab76f59
feat: migrate asyncio usage to anyio across the codebase
raballew May 13, 2026
205bfd3
fix: correct type annotation for get_minikube_ip profile parameter
raballew May 13, 2026
04c8bbc
fix: remove redundant inner timeout from TLS connection attempts
raballew May 13, 2026
0ada4ab
fix: add task group cleanup for websocket telemetry handler
raballew May 13, 2026
36791f3
fix: resolve lint and type-check failures in asyncio-to-anyio migration
raballew May 13, 2026
6770485
fix: replace process.communicate() with anyio.run_process() in operator
raballew May 13, 2026
1c1b5b8
fix: use BufferedByteReceiveStream for line reading in QEMU driver
raballew May 13, 2026
f4f8475
fix: replace remaining pytest.mark.asyncio with pytest.mark.anyio
raballew May 13, 2026
f20aa26
fix: migrate dut-network tcpdump from asyncio to anyio
raballew May 13, 2026
3c8a0e6
fix: read all chunks until EndOfStream in shell driver read_all mode
raballew May 13, 2026
f2bca7f
fix: handle ExceptionGroup in mitmproxy websocket handler shutdown
raballew May 13, 2026
4a1fd07
fix: rename timeout test and document output discard design decision
raballew May 13, 2026
c817d97
fix: add sniffio as explicit dependency
raballew May 13, 2026
ee58222
fix: document pyserial asyncio usage as known exception to anyio migr…
raballew May 13, 2026
75b1d3e
fix: strengthen QEMU inner wait timeout test assertions
raballew May 13, 2026
6a770b3
fix: document CPython-internal _sslobj usage for certificate extraction
raballew May 13, 2026
108d103
fix: handle IncompleteRead from BufferedByteReceiveStream
raballew May 13, 2026
4a34e5f
fix: defer errors from task group to avoid ExceptionGroup wrapping
raballew May 13, 2026
cd2aa30
fix: resolve lint violations and reduce shell driver complexity
raballew May 13, 2026
7d68fad
fix: narrow shell driver exception handler and add process cleanup
raballew May 13, 2026
cb34233
fix: resolve CI failures from anyio migration
raballew May 13, 2026
cc965fb
fix: resolve type-check and pytest-anyio CI failures
raballew May 26, 2026
7f386bb
fix: update rotate_test mock path for anyio migration
raballew May 26, 2026
923e393
fix: add anyio_backend fixture to all packages missing it
raballew May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# - Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information

import asyncio
import os
import sys

import anyio
from jumpstarter_kubernetes.controller import get_latest_compatible_controller_version

os.environ["TERM"] = "dumb"
Expand Down Expand Up @@ -64,7 +64,9 @@ def get_controller_version():
else:
version = None

return asyncio.run(get_latest_compatible_controller_version(client_version=version))
async def _run():
return await get_latest_compatible_controller_version(client_version=version)
return anyio.run(_run)


def get_index_url():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pytest


@pytest.fixture
def anyio_backend():
return "asyncio"
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from asyncio import run
from functools import wraps

import anyio


def blocking(f):
@wraps(f)
def wrapper(*args, **kwargs):
return run(f(*args, **kwargs))
async def _run():
return await f(*args, **kwargs)
return anyio.run(_run)

return wrapper
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pytest


@pytest.fixture
def anyio_backend():
return "asyncio"
16 changes: 10 additions & 6 deletions python/packages/jumpstarter-cli/jumpstarter_cli/login_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import json
import ssl
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import click
import pytest
from click.testing import CliRunner
Expand Down Expand Up @@ -84,7 +84,9 @@ def get(self, *args, **kwargs):
monkeypatch.setattr("jumpstarter_cli.login.aiohttp.ClientSession", FakeClientSession)

with pytest.raises(click.ClickException, match="Timed out while connecting"):
asyncio.run(fetch_auth_config("login.example.com"))
async def _run():
return await fetch_auth_config("login.example.com")
anyio.run(_run)


def test_fetch_auth_config_maps_json_decode_error(monkeypatch) -> None:
Expand Down Expand Up @@ -116,7 +118,9 @@ def get(self, *args, **kwargs):
monkeypatch.setattr("jumpstarter_cli.login.aiohttp.ClientSession", FakeClientSession)

with pytest.raises(click.ClickException, match="Invalid JSON response received"):
asyncio.run(fetch_auth_config("login.example.com"))
async def _run():
return await fetch_auth_config("login.example.com")
anyio.run(_run)


def test_login_cli_shows_timeout_message(monkeypatch) -> None:
Expand Down Expand Up @@ -151,13 +155,13 @@ async def fake_fetch_auth_config(*args, **kwargs):
assert "TLS certificate verification failed" in result.output


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_rejects_http_without_insecure_tls():
with pytest.raises(click.UsageError, match="--insecure-tls"):
await fetch_auth_config("http://login.example.com", insecure_tls=False)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_allows_explicit_http_with_insecure_tls():
mock_response = MagicMock()
mock_response.status = 200
Expand All @@ -183,7 +187,7 @@ async def test_fetch_auth_config_allows_explicit_http_with_insecure_tls():
assert result["grpcEndpoint"] == "grpc.example.com"


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_defaults_to_https_with_insecure_tls():
mock_response = MagicMock()
mock_response.status = 200
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio
import asyncio.subprocess
import ipaddress
import shutil
import socket
Expand All @@ -8,8 +6,14 @@
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import PIPE
from typing import Literal, TypedDict

import anyio
from anyio import IncompleteRead
from anyio.abc import Process
from anyio.streams.buffered import BufferedByteReceiveStream

from . import dnsmasq, iproute, nftables
from .ntp_server import NtpServer
from jumpstarter.driver import Driver, export
Expand Down Expand Up @@ -151,7 +155,7 @@ class DutNetwork(Driver):
_added_aliases: set[str] = field(init=False, default_factory=set)
_fwd_rule_handles: list[int] = field(init=False, default_factory=list)
_ntp_server: NtpServer | None = field(init=False, default=None)
_tcpdump_process: asyncio.subprocess.Process | None = field(init=False, default=None)
_tcpdump_process: Process | None = field(init=False, default=None)

@classmethod
def client(cls) -> str:
Expand Down Expand Up @@ -566,21 +570,18 @@ async def tcpdump(self, args: list[str] | None = None) -> AsyncGenerator[str, No

self.logger.info("Starting tcpdump: %s", " ".join(cmd))

proc = await asyncio.subprocess.create_subprocess_exec(
cmd[0],
*cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
proc = await anyio.open_process(cmd, stdout=PIPE, stderr=PIPE)
self._tcpdump_process = proc

try:
assert proc.stdout is not None
buffered = BufferedByteReceiveStream(proc.stdout)
while True:
line = await proc.stdout.readline()
if not line:
try:
line = await buffered.receive_until(b"\n", 1048576)
except (anyio.EndOfStream, anyio.ClosedResourceError, IncompleteRead):
break
yield line.decode("utf-8", errors="replace").rstrip("\n")
yield line.decode("utf-8", errors="replace")
finally:
if proc.returncode is None:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
and the streaming driver method using mocked subprocesses.
"""

import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import anyio.abc
import pytest

from .driver import DutNetwork
Expand Down Expand Up @@ -161,44 +162,50 @@ def test_multiple_blocked_flags(self):
assert DutNetwork._sanitize_tcpdump_args(args) == ["-c", "5"]


def _create_mock_byte_stream(data: bytes):
"""Create a mock ByteReceiveStream that returns data then raises EndOfStream."""
stream = MagicMock(spec=anyio.abc.ByteReceiveStream)
call_count = {"n": 0}

async def mock_receive(max_bytes=65536):
if call_count["n"] >= 1:
raise anyio.EndOfStream()
call_count["n"] += 1
return data

stream.receive = mock_receive
stream.aclose = AsyncMock()
return stream


class TestTcpdumpMethod:
def test_tcpdump_raises_when_disabled(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=False)
with pytest.raises(RuntimeError, match="tcpdump is not enabled"):
asyncio.run(
_consume_async_gen(driver.tcpdump())
anyio.run(
_consume_async_gen, driver.tcpdump()
)

def test_tcpdump_streams_output(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
lines = [
b"12:00:00.000000 IP 192.168.100.10 > 8.8.8.8: ICMP echo request\n",
b"12:00:00.001000 IP 8.8.8.8 > 192.168.100.10: ICMP echo reply\n",
b"", # EOF
]
state = {"call_count": 0}

async def mock_readline():
if state["call_count"] < len(lines):
result = lines[state["call_count"]]
state["call_count"] += 1
return result
return b""

mock_stdout.readline = mock_readline
data = (
b"12:00:00.000000 IP 192.168.100.10 > 8.8.8.8: ICMP echo request\n"
b"12:00:00.001000 IP 8.8.8.8 > 192.168.100.10: ICMP echo reply\n"
)
mock_stdout = _create_mock_byte_stream(data)

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = None
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc):
output = asyncio.run(
_consume_async_gen(driver.tcpdump())
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc):
output = anyio.run(
_consume_async_gen, driver.tcpdump()
)

assert len(output) == 2
Expand All @@ -208,82 +215,55 @@ async def mock_readline():
def test_tcpdump_enforces_interface(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
mock_stdout.readline = AsyncMock(return_value=b"")
mock_stdout = _create_mock_byte_stream(b"")

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = 0
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc) as mock_exec:
asyncio.run(
_consume_async_gen(driver.tcpdump(args=["-i", "evil-iface", "-c", "1"]))
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc) as mock_exec:
anyio.run(
_consume_async_gen, driver.tcpdump(args=["-i", "evil-iface", "-c", "1"])
)

# Verify the command was called with the correct interface
call_args = mock_exec.call_args[0]
cmd = list(call_args)
cmd = list(call_args[0])
assert cmd[0] == "tcpdump"
assert "-i" in cmd
iface_idx = cmd.index("-i")
assert cmd[iface_idx + 1] == "eth-dut"
# The user-specified -i should have been removed by sanitization
assert cmd.count("-i") == 1

def test_tcpdump_passes_extra_args(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
mock_stdout.readline = AsyncMock(return_value=b"")
mock_stdout = _create_mock_byte_stream(b"")

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = 0
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc) as mock_exec:
asyncio.run(
_consume_async_gen(driver.tcpdump(args=["-c", "10", "-n", "port", "80"]))
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc) as mock_exec:
anyio.run(
_consume_async_gen, driver.tcpdump(args=["-c", "10", "-n", "port", "80"])
)

call_args = mock_exec.call_args[0]
cmd = list(call_args)
cmd = list(call_args[0])
assert "-c" in cmd
assert "10" in cmd
assert "-n" in cmd
assert "port" in cmd
assert "80" in cmd

def test_tcpdump_cleanup_on_cancel(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
# Simulate a stream that never ends
mock_stdout.readline = AsyncMock(
side_effect=[b"line 1\n", b"line 2\n", asyncio.CancelledError()]
)

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.returncode = None
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc):
with pytest.raises(asyncio.CancelledError):
asyncio.run(
_consume_async_gen(driver.tcpdump())
)

# Verify the process was terminated
mock_proc.terminate.assert_called_once()


class TestTcpdumpCleanup:
def test_cleanup_terminates_tcpdump_process(self, tmp_path: Path):
Expand Down
Loading
Loading