fix(sdk,core): cache realtime-stream credentials per slot with refresh on writer failure#3658
Conversation
…h on writer failure Hot-loop writers (`streams.writer` / `streams.pipe` on the run-scoped side, `chat.response.write` / `chat.stream.*` on the session side) were issuing a fresh PUT to mint S2 credentials for every chunk. On run streams, each PUT also pushed the streamId onto `TaskRun.realtimeStreams`, so a chat-agent turn writing N chunks produced N PUTs and N duplicate array pushes against the same row. The SDK now caches the initialize response per cache slot — `(runId, key)` for run streams, the session id for session streams. First call PUTs as before; subsequent calls reuse the cached promise. Hot-loop writers do one PUT per slot for the lifetime of the cache. S2 access tokens have a 1-day TTL. If a writer's `wait()` rejects (auth error, expired token, network blip), the cache evicts the matching slot so the next call re-PUTs and mints fresh credentials, identity-checked so a concurrent caller's fresh promise isn't accidentally cleared. `streams.pipe / writer / append / read` called inside a `chat.agent` run now logs a one-time warning pointing at `chat.response.write` / `chat.stream.*` — `streams.*` is run-scoped and isn't visible on the chat session. Drops the run-stream guidance from the ai-chat docs.
|
WalkthroughThis PR implements intelligent caching for stream creation and initialization operations to prevent duplicate API calls during repeated Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary
Hot-loop writers —
streams.writer/streams.pipeon the run-scopedside,
chat.response.write/chat.stream.*on the session side — wereissuing a fresh
PUTto mint S2 credentials for every chunk. On runstreams, each PUT also pushed the streamId onto
TaskRun.realtimeStreams,so a chat-agent turn writing N chunks produced N PUTs and N duplicate
array pushes against the same row.
The SDK now caches the initialize response per cache slot:
(runId, key)for run streams, the session id for session streams. First call PUTs as
before; subsequent calls reuse the cached promise. Hot-loop writers do
one PUT per slot for the lifetime of the cache.
S2 access tokens have a 1-day TTL. If a writer's
wait()rejects (autherror, expired token, network blip), the cache evicts the matching slot
so the next call re-PUTs and mints fresh credentials, identity-checked
so a concurrent caller's fresh promise isn't accidentally cleared.
chat.agent guardrail
streams.pipe / writer / append / readcalled inside achat.agentrunnow logs a one-time warning pointing at
chat.response.write/chat.stream.*—streams.*is run-scoped and isn't visible on thechat session. The ai-chat docs are updated to drop the old guidance
toward run-scoped streams.
Test plan
reset()clears, reactive invalidation onwait()rejection).chat.response.writetripsChatChunkTooLargeError, the sessioncache evicts, and the next write mints fresh credentials.
@trigger.dev/core, 5 new + the existing suite in@trigger.dev/sdk).