-
Notifications
You must be signed in to change notification settings - Fork 4.2k
fix: wake E2B PTY collection on process exit #3629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -686,6 +686,8 @@ class _E2BPtyProcessEntry: | |
| output_lock: asyncio.Lock = field(default_factory=asyncio.Lock) | ||
| output_notify: asyncio.Event = field(default_factory=asyncio.Event) | ||
| last_used: float = field(default_factory=time.monotonic) | ||
| done: bool = False | ||
| exit_code: int | None = None | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
|
|
@@ -982,6 +984,7 @@ async def _append_output(payload: bytes | bytearray | str | object) -> None: | |
| on_data=_append_output, | ||
| ) | ||
| entry.handle = handle | ||
| asyncio.create_task(self._run_pty_waiter(entry)) | ||
| await self._sandbox.pty.send_stdin( | ||
| cast(Any, handle).pid, | ||
| f"{command_text}\n".encode(), | ||
|
|
@@ -999,6 +1002,7 @@ async def _append_output(payload: bytes | bytearray | str | object) -> None: | |
| on_stderr=_append_output, | ||
| ) | ||
| entry.handle = handle | ||
| asyncio.create_task(self._run_pty_waiter(entry)) | ||
| async with self._pty_lock: | ||
| process_id = allocate_pty_process_id(self._reserved_pty_process_ids) | ||
| self._reserved_pty_process_ids.add(process_id) | ||
|
|
@@ -1044,6 +1048,24 @@ async def _append_output(payload: bytes | bytearray | str | object) -> None: | |
| original_token_count=original_token_count, | ||
| ) | ||
|
|
||
| async def _run_pty_waiter(self, entry: _E2BPtyProcessEntry) -> None: | ||
| wait = getattr(entry.handle, "wait", None) | ||
| if not callable(wait): | ||
| return | ||
|
|
||
| try: | ||
| result = wait() | ||
| if inspect.isawaitable(result): | ||
| await result | ||
| exit_code = getattr(entry.handle, "exit_code", None) | ||
| if exit_code is not None: | ||
| entry.exit_code = int(exit_code) | ||
| except Exception: | ||
| pass | ||
| finally: | ||
| entry.done = True | ||
| entry.output_notify.set() | ||
|
Comment on lines
+1063
to
+1067
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
|
|
||
| async def pty_write_stdin( | ||
| self, | ||
| *, | ||
|
|
@@ -1195,7 +1217,7 @@ async def _collect_pty_output( | |
| if time.monotonic() >= deadline: | ||
| break | ||
|
|
||
| if self._entry_exit_code(entry) is not None: | ||
| if self._entry_done(entry): | ||
| async with entry.output_lock: | ||
| while entry.output_chunks: | ||
| output.extend(entry.output_chunks.popleft()) | ||
|
|
@@ -1226,7 +1248,7 @@ async def _finalize_pty_update( | |
| exit_code = self._entry_exit_code(entry) | ||
| live_process_id: int | None = process_id | ||
|
|
||
| if exit_code is not None: | ||
| if self._entry_done(entry): | ||
| async with self._pty_lock: | ||
| removed = self._pty_processes.pop(process_id, None) | ||
| self._reserved_pty_process_ids.discard(process_id) | ||
|
|
@@ -1246,7 +1268,7 @@ def _prune_pty_processes_if_needed(self) -> _E2BPtyProcessEntry | None: | |
| return None | ||
|
|
||
| meta: list[tuple[int, float, bool]] = [ | ||
| (process_id, entry.last_used, self._entry_exit_code(entry) is not None) | ||
| (process_id, entry.last_used, self._entry_done(entry)) | ||
| for process_id, entry in self._pty_processes.items() | ||
| ] | ||
| process_id = process_id_to_prune_from_meta(meta) | ||
|
|
@@ -1257,6 +1279,8 @@ def _prune_pty_processes_if_needed(self) -> _E2BPtyProcessEntry | None: | |
| return self._pty_processes.pop(process_id, None) | ||
|
|
||
| def _entry_exit_code(self, entry: _E2BPtyProcessEntry) -> int | None: | ||
| if entry.exit_code is not None: | ||
| return entry.exit_code | ||
| value = getattr(entry.handle, "exit_code", None) | ||
| if value is None: | ||
| return None | ||
|
|
@@ -1265,6 +1289,9 @@ def _entry_exit_code(self, entry: _E2BPtyProcessEntry) -> int | None: | |
| except (TypeError, ValueError): | ||
| return None | ||
|
|
||
| def _entry_done(self, entry: _E2BPtyProcessEntry) -> bool: | ||
| return entry.done or self._entry_exit_code(entry) is not None | ||
|
|
||
| async def _terminate_pty_entry(self, entry: _E2BPtyProcessEntry) -> None: | ||
| kill = getattr(entry.handle, "kill", None) | ||
| if callable(kill): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
tty=Truesessions, this waiter is attached to the handle returned by E2Bpty.create, which starts a login shell (/bin/bash -i -l) rather than the command sent on the next line (E2B SDK). When a child command exits from inside that PTY, such aspython3exiting after stdin, the shell remains alive sowait()never completes andpty_write_stdin(..., yield_time_s=...)still waits for the full yield window unless the shell itself exits; the new test fakes the PTY handle finishing on child exit, but real E2B does not do that.Useful? React with 👍 / 👎.