Skip to content

fix(sdk,core): cache realtime-stream credentials per slot with refresh on writer failure#3658

Merged
ericallam merged 1 commit into
mainfrom
fix/tri-9377-realtime-streams-creds-cache
May 18, 2026
Merged

fix(sdk,core): cache realtime-stream credentials per slot with refresh on writer failure#3658
ericallam merged 1 commit into
mainfrom
fix/tri-9377-realtime-streams-creds-cache

Conversation

@ericallam
Copy link
Copy Markdown
Member

Summary

Hot-loop writers — streams.writer / streams.pipe on the run-scoped
side, chat.response.write / chat.stream.* on the session side — were
issuing a fresh PUT to mint S2 credentials for every chunk. On run
streams, each PUT also pushed the streamId onto TaskRun.realtimeStreams,
so a chat-agent turn writing N chunks produced N PUTs and N duplicate
array pushes against the same row.

The SDK now caches the initialize response per cache slot: (runId, key)
for run streams, the session id for session streams. First call PUTs as
before; subsequent calls reuse the cached promise. Hot-loop writers do
one PUT per slot for the lifetime of the cache.

S2 access tokens have a 1-day TTL. If a writer's wait() rejects (auth
error, expired token, network blip), the cache evicts the matching slot
so the next call re-PUTs and mints fresh credentials, identity-checked
so a concurrent caller's fresh promise isn't accidentally cleared.

chat.agent guardrail

streams.pipe / writer / append / read called inside a chat.agent run
now logs a one-time warning pointing at chat.response.write /
chat.stream.*streams.* is run-scoped and isn't visible on the
chat session. The ai-chat docs are updated to drop the old guidance
toward run-scoped streams.

Test plan

  • Unit tests for both caches (dedup, scoping, failure eviction,
    reset() clears, reactive invalidation on wait() rejection).
  • End-to-end against a live S2 backend: oversized
    chat.response.write trips ChatChunkTooLargeError, the session
    cache evicts, and the next write mints fresh credentials.
  • Existing realtime + chat unit tests still pass (13 in
    @trigger.dev/core, 5 new + the existing suite in @trigger.dev/sdk).

…h on writer failure

Hot-loop writers (`streams.writer` / `streams.pipe` on the run-scoped
side, `chat.response.write` / `chat.stream.*` on the session side) were
issuing a fresh PUT to mint S2 credentials for every chunk. On run
streams, each PUT also pushed the streamId onto `TaskRun.realtimeStreams`,
so a chat-agent turn writing N chunks produced N PUTs and N duplicate
array pushes against the same row.

The SDK now caches the initialize response per cache slot — `(runId, key)`
for run streams, the session id for session streams. First call PUTs as
before; subsequent calls reuse the cached promise. Hot-loop writers do
one PUT per slot for the lifetime of the cache.

S2 access tokens have a 1-day TTL. If a writer's `wait()` rejects (auth
error, expired token, network blip), the cache evicts the matching slot
so the next call re-PUTs and mints fresh credentials, identity-checked
so a concurrent caller's fresh promise isn't accidentally cleared.

`streams.pipe / writer / append / read` called inside a `chat.agent` run
now logs a one-time warning pointing at `chat.response.write` /
`chat.stream.*` — `streams.*` is run-scoped and isn't visible on the
chat session. Drops the run-stream guidance from the ai-chat docs.
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 18, 2026

⚠️ No Changeset found

Latest commit: a434e4e

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

Review Change Stack

Walkthrough

This PR implements intelligent caching for stream creation and initialization operations to prevent duplicate API calls during repeated pipe() invocations. Two caches are added: one in StandardRealtimeStreamsManager for createStream responses and one in SessionOutputChannel for initializeSessionStream responses. Both caches automatically evict failed promises and use promise identity matching to selectively invalidate entries when downstream writers fail, enabling retry with fresh credentials. Additionally, the PR introduces per-run warnings for unsupported stream method usage inside chat.agent runs.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is comprehensive and well-structured, covering the problem, solution, failure handling, guardrails, and test plan. However, it does not follow the provided template structure with required sections like checklist, Testing, Changelog, and Screenshots. Reformat the description to follow the repository template: add the checklist with contribution guide verification, move the summary content into the Testing and Changelog sections, and include any relevant screenshots.
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: implementing credential caching for realtime streams with refresh on writer failure, which is the core objective of this PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/tri-9377-realtime-streams-creds-cache

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

@ericallam ericallam marked this pull request as ready for review May 18, 2026 14:40
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 4 additional findings.

Open in Devin Review

@ericallam ericallam merged commit 8b98e21 into main May 18, 2026
56 checks passed
@ericallam ericallam deleted the fix/tri-9377-realtime-streams-creds-cache branch May 18, 2026 15:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants