diff --git a/src/agents/sandbox/session/sinks.py b/src/agents/sandbox/session/sinks.py index 77d90cc086..d9fdfee609 100644 --- a/src/agents/sandbox/session/sinks.py +++ b/src/agents/sandbox/session/sinks.py @@ -8,7 +8,6 @@ from pathlib import Path from types import ModuleType from typing import Literal, Protocol, runtime_checkable -from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from ..errors import WorkspaceReadNotFoundError @@ -181,7 +180,6 @@ def __init__( self._seen = 0 self._lock = asyncio.Lock() self._flush_every = max(1, int(flush_every)) - self._existing_outbox_loaded = False def _resolve_relpath(self) -> Path: rel = self.workspace_relpath @@ -234,23 +232,23 @@ async def _can_flush_to_workspace(self) -> bool: return False async def _flush_buffer(self) -> None: - if self._session is None: + if self._session is None or not self._buf: return - await self._ensure_existing_outbox_loaded() relpath = self._resolved_workspace_relpath or self.workspace_relpath - await self._session.write(relpath, io.BytesIO(bytes(self._buf))) + existing = await self._read_existing_outbox(relpath) + pending = bytes(self._buf) + await self._session.write(relpath, io.BytesIO(existing + pending)) + self._buf.clear() - async def _ensure_existing_outbox_loaded(self) -> None: - if self._session is None or self._existing_outbox_loaded: - return + async def _read_existing_outbox(self, relpath: Path) -> bytes: + if self._session is None: + return b"" - relpath = self._resolved_workspace_relpath or self.workspace_relpath try: existing = await self._session.read(relpath) except (FileNotFoundError, WorkspaceReadNotFoundError): - self._existing_outbox_loaded = True - return + return b"" try: payload = existing.read() @@ -258,10 +256,8 @@ async def _ensure_existing_outbox_loaded(self) -> None: existing.close() if isinstance(payload, str): - payload = payload.encode("utf-8") - if payload: - self._buf = bytearray(payload) + self._buf - self._existing_outbox_loaded = True + return payload.encode("utf-8") + return bytes(payload) async def handle(self, event: SandboxSessionEvent) -> None: # If unbound (e.g., audit event emission used without a SandboxSession wrapper), @@ -317,7 +313,7 @@ def _post(self, body: bytes, spool_line: str | None) -> None: try: with urlopen(req, timeout=self.timeout_s) as resp: _ = resp.read(1) # ensure request completes - except (HTTPError, URLError) as e: + except OSError as e: if spool_line is not None and self.spool_path is not None: try: self.spool_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/tests/sandbox/test_session_sinks.py b/tests/sandbox/test_session_sinks.py index 420f142f3e..09efbd83ca 100644 --- a/tests/sandbox/test_session_sinks.py +++ b/tests/sandbox/test_session_sinks.py @@ -6,6 +6,7 @@ import tarfile import uuid from pathlib import Path +from unittest.mock import patch import pytest from inline_snapshot import snapshot @@ -21,6 +22,7 @@ CallbackSink, ChainedSink, EventPayloadPolicy, + HttpProxySink, Instrumentation, JsonlOutboxSink, SandboxSession, @@ -277,6 +279,32 @@ async def test_workspace_jsonl_sink_does_not_duplicate_lines_across_flushes( assert [json.loads(line)["seq"] for line in lines] == [1, 2, 3] +@pytest.mark.asyncio +async def test_workspace_jsonl_sink_clears_flushed_buffer(tmp_path: Path) -> None: + inner = _build_unix_local_session(tmp_path) + relpath = Path(f"logs/events-{inner.state.session_id}.jsonl") + + async with inner: + sink = WorkspaceJsonlSink(mode="sync", on_error="raise", ephemeral=False, flush_every=1) + sink.bind(inner) + + for seq in (1, 2): + await sink.handle( + SandboxSessionStartEvent( + session_id=inner.state.session_id, + seq=seq, + op="write", + span_id=str(uuid.uuid4()), + ) + ) + assert sink._buf == bytearray() + + outbox_stream = await inner.read(relpath) + lines = outbox_stream.read().decode("utf-8").splitlines() + + assert [json.loads(line)["seq"] for line in lines] == [1, 2] + + @pytest.mark.asyncio async def test_workspace_jsonl_sink_ephemeral_excludes_runtime_outbox_with_existing_parent( tmp_path: Path, @@ -363,6 +391,31 @@ def _callback(event: SandboxSessionEvent, session: BaseSandboxSession) -> None: assert all(session is inner for _op, session in seen) +@pytest.mark.asyncio +async def test_http_proxy_sink_spools_direct_timeout(tmp_path: Path) -> None: + spool_path = tmp_path / "events.jsonl" + sink = HttpProxySink( + "http://127.0.0.1:9/events", + mode="sync", + on_error="raise", + spool_path=spool_path, + ) + event = SandboxSessionStartEvent( + session_id=uuid.uuid4(), + seq=1, + op="write", + span_id=str(uuid.uuid4()), + ) + + with patch("agents.sandbox.session.sinks.urlopen", side_effect=TimeoutError("timed out")): + with pytest.raises(RuntimeError, match="http proxy sink POST failed"): + await sink.handle(event) + + lines = spool_path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 1 + assert json.loads(lines[0])["seq"] == 1 + + @pytest.mark.asyncio async def test_sandbox_session_error_events_and_traces_include_retryability( tmp_path: Path,