Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions src/agents/sandbox/session/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pathlib import Path
from types import ModuleType
from typing import Literal, Protocol, runtime_checkable
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

from ..errors import WorkspaceReadNotFoundError
Expand Down Expand Up @@ -181,7 +180,6 @@ def __init__(
self._seen = 0
self._lock = asyncio.Lock()
self._flush_every = max(1, int(flush_every))
self._existing_outbox_loaded = False

def _resolve_relpath(self) -> Path:
rel = self.workspace_relpath
Expand Down Expand Up @@ -234,34 +232,32 @@ async def _can_flush_to_workspace(self) -> bool:
return False

async def _flush_buffer(self) -> None:
if self._session is None:
if self._session is None or not self._buf:
return

await self._ensure_existing_outbox_loaded()
relpath = self._resolved_workspace_relpath or self.workspace_relpath
await self._session.write(relpath, io.BytesIO(bytes(self._buf)))
existing = await self._read_existing_outbox(relpath)
pending = bytes(self._buf)
await self._session.write(relpath, io.BytesIO(existing + pending))
self._buf.clear()

async def _ensure_existing_outbox_loaded(self) -> None:
if self._session is None or self._existing_outbox_loaded:
return
async def _read_existing_outbox(self, relpath: Path) -> bytes:
if self._session is None:
return b""

relpath = self._resolved_workspace_relpath or self.workspace_relpath
try:
existing = await self._session.read(relpath)
except (FileNotFoundError, WorkspaceReadNotFoundError):
self._existing_outbox_loaded = True
return
return b""

try:
payload = existing.read()
finally:
existing.close()

if isinstance(payload, str):
payload = payload.encode("utf-8")
if payload:
self._buf = bytearray(payload) + self._buf
self._existing_outbox_loaded = True
return payload.encode("utf-8")
return bytes(payload)

async def handle(self, event: SandboxSessionEvent) -> None:
# If unbound (e.g., audit event emission used without a SandboxSession wrapper),
Expand Down Expand Up @@ -317,7 +313,7 @@ def _post(self, body: bytes, spool_line: str | None) -> None:
try:
with urlopen(req, timeout=self.timeout_s) as resp:
_ = resp.read(1) # ensure request completes
except (HTTPError, URLError) as e:
except OSError as e:
if spool_line is not None and self.spool_path is not None:
try:
self.spool_path.parent.mkdir(parents=True, exist_ok=True)
Expand Down
53 changes: 53 additions & 0 deletions tests/sandbox/test_session_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tarfile
import uuid
from pathlib import Path
from unittest.mock import patch

import pytest
from inline_snapshot import snapshot
Expand All @@ -21,6 +22,7 @@
CallbackSink,
ChainedSink,
EventPayloadPolicy,
HttpProxySink,
Instrumentation,
JsonlOutboxSink,
SandboxSession,
Expand Down Expand Up @@ -277,6 +279,32 @@ async def test_workspace_jsonl_sink_does_not_duplicate_lines_across_flushes(
assert [json.loads(line)["seq"] for line in lines] == [1, 2, 3]


@pytest.mark.asyncio
async def test_workspace_jsonl_sink_clears_flushed_buffer(tmp_path: Path) -> None:
inner = _build_unix_local_session(tmp_path)
relpath = Path(f"logs/events-{inner.state.session_id}.jsonl")

async with inner:
sink = WorkspaceJsonlSink(mode="sync", on_error="raise", ephemeral=False, flush_every=1)
sink.bind(inner)

for seq in (1, 2):
await sink.handle(
SandboxSessionStartEvent(
session_id=inner.state.session_id,
seq=seq,
op="write",
span_id=str(uuid.uuid4()),
)
)
assert sink._buf == bytearray()

outbox_stream = await inner.read(relpath)
lines = outbox_stream.read().decode("utf-8").splitlines()

assert [json.loads(line)["seq"] for line in lines] == [1, 2]


@pytest.mark.asyncio
async def test_workspace_jsonl_sink_ephemeral_excludes_runtime_outbox_with_existing_parent(
tmp_path: Path,
Expand Down Expand Up @@ -363,6 +391,31 @@ def _callback(event: SandboxSessionEvent, session: BaseSandboxSession) -> None:
assert all(session is inner for _op, session in seen)


@pytest.mark.asyncio
async def test_http_proxy_sink_spools_direct_timeout(tmp_path: Path) -> None:
spool_path = tmp_path / "events.jsonl"
sink = HttpProxySink(
"http://127.0.0.1:9/events",
mode="sync",
on_error="raise",
spool_path=spool_path,
)
event = SandboxSessionStartEvent(
session_id=uuid.uuid4(),
seq=1,
op="write",
span_id=str(uuid.uuid4()),
)

with patch("agents.sandbox.session.sinks.urlopen", side_effect=TimeoutError("timed out")):
with pytest.raises(RuntimeError, match="http proxy sink POST failed"):
await sink.handle(event)

lines = spool_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
assert json.loads(lines[0])["seq"] == 1


@pytest.mark.asyncio
async def test_sandbox_session_error_events_and_traces_include_retryability(
tmp_path: Path,
Expand Down