diff --git a/packages/core/src/v3/realtime-streams-api.ts b/packages/core/src/v3/realtime-streams-api.ts index d9cd9ecfb4..728399bea6 100644 --- a/packages/core/src/v3/realtime-streams-api.ts +++ b/packages/core/src/v3/realtime-streams-api.ts @@ -6,7 +6,10 @@ export const realtimeStreams = RealtimeStreamsAPI.getInstance(); export * from "./realtimeStreams/types.js"; export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js"; -export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js"; +export type { + SessionStreamInstanceOptions, + InitializeSessionStreamResponseLike, +} from "./realtimeStreams/sessionStreamInstance.js"; export { trimSessionStream, writeSessionControlRecord, diff --git a/packages/core/src/v3/realtimeStreams/index.ts b/packages/core/src/v3/realtimeStreams/index.ts index b1c2073580..71854888ee 100644 --- a/packages/core/src/v3/realtimeStreams/index.ts +++ b/packages/core/src/v3/realtimeStreams/index.ts @@ -10,7 +10,10 @@ import { // `SessionOutputChannel.pipe` / `.writer` can construct it without reaching // into the core package's internals. export { SessionStreamInstance } from "./sessionStreamInstance.js"; -export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js"; +export type { + SessionStreamInstanceOptions, + InitializeSessionStreamResponseLike, +} from "./sessionStreamInstance.js"; export { trimSessionStream, writeSessionControlRecord, diff --git a/packages/core/src/v3/realtimeStreams/manager.test.ts b/packages/core/src/v3/realtimeStreams/manager.test.ts new file mode 100644 index 0000000000..179754bc75 --- /dev/null +++ b/packages/core/src/v3/realtimeStreams/manager.test.ts @@ -0,0 +1,147 @@ +import { describe, expect, it, vi } from "vitest"; +import type { ApiClient } from "../apiClient/index.js"; +import { StandardRealtimeStreamsManager } from "./manager.js"; + +// The cache lives on a private method to keep `pipe()` callers from having +// to thread cache concerns. Tests exercise it via bracket-notation to keep +// the assertions tight on cache contracts and avoid spinning up real +// `StreamsWriterV1`/`StreamsWriterV2` infrastructure (HTTP requests, S2 +// connections) for what is purely an in-memory dedup check. +type GetCached = ( + runId: string, + key: string, + requestOptions?: undefined +) => Promise<{ version: string; headers?: Record }>; + +function getCached(manager: StandardRealtimeStreamsManager, runId: string, key: string) { + return (manager as unknown as { getCachedCreateStream: GetCached }).getCachedCreateStream( + runId, + key + ); +} + +function makeApiClient(impl: () => Promise<{ version: string; headers?: Record }>) { + const spy = vi.fn(impl); + const client = { createStream: spy } as unknown as ApiClient; + return { client, spy }; +} + +describe("StandardRealtimeStreamsManager createStream cache", () => { + it("dedupes repeated calls for the same (runId, key)", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + const p1 = getCached(manager, "run-1", "chat"); + const p2 = getCached(manager, "run-1", "chat"); + + expect(p1).toBe(p2); + expect(spy).toHaveBeenCalledTimes(1); + await Promise.all([p1, p2]); + expect(spy).toHaveBeenCalledTimes(1); + }); + + it("issues a separate PUT for each distinct stream key on the same run", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + await Promise.all([ + getCached(manager, "run-1", "chat"), + getCached(manager, "run-1", "tool-output"), + ]); + + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenNthCalledWith(1, "run-1", "self", "chat", undefined); + expect(spy).toHaveBeenNthCalledWith(2, "run-1", "self", "tool-output", undefined); + }); + + it("issues a separate PUT for each distinct run, even with the same key", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + await Promise.all([ + getCached(manager, "run-1", "chat"), + getCached(manager, "run-2", "chat"), + ]); + + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("evicts on failure so the next call retries instead of returning a poisoned entry", async () => { + const spy = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce({ version: "v1", headers: {} }); + const client = { createStream: spy } as unknown as ApiClient; + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + await expect(getCached(manager, "run-1", "chat")).rejects.toThrow("boom"); + + const retried = await getCached(manager, "run-1", "chat"); + + expect(retried).toEqual({ version: "v1", headers: {} }); + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("reset() clears cached entries so the next call re-PUTs", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + await getCached(manager, "run-1", "chat"); + expect(spy).toHaveBeenCalledTimes(1); + + manager.reset(); + + await getCached(manager, "run-1", "chat"); + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("evictCreateStreamIfStale clears the matching entry so the next call re-PUTs", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + // Prime the cache and capture which promise was stored. + const cachedPromise = getCached(manager, "run-1", "chat"); + await cachedPromise; + expect(spy).toHaveBeenCalledTimes(1); + + // Simulate the reactive invalidation path that `pipe()` runs when a + // writer's `wait()` rejects. + ( + manager as unknown as { + evictCreateStreamIfStale: ( + runId: string, + key: string, + expected: Promise + ) => void; + } + ).evictCreateStreamIfStale("run-1", "chat", cachedPromise); + + await getCached(manager, "run-1", "chat"); + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("evictCreateStreamIfStale is a no-op when the cache holds a different promise", async () => { + const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} })); + const manager = new StandardRealtimeStreamsManager(client, "http://localhost"); + + const original = getCached(manager, "run-1", "chat"); + await original; + + // A different promise (e.g. from a concurrent caller that already + // refreshed) shouldn't trigger eviction. + const stalePromise = Promise.resolve({ version: "v1", headers: {} }); + ( + manager as unknown as { + evictCreateStreamIfStale: ( + runId: string, + key: string, + expected: Promise + ) => void; + } + ).evictCreateStreamIfStale("run-1", "chat", stalePromise); + + // Cache should still hold the original entry; next call is a hit. + await getCached(manager, "run-1", "chat"); + expect(spy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/core/src/v3/realtimeStreams/manager.ts b/packages/core/src/v3/realtimeStreams/manager.ts index beda3535fb..f4d915acc3 100644 --- a/packages/core/src/v3/realtimeStreams/manager.ts +++ b/packages/core/src/v3/realtimeStreams/manager.ts @@ -1,7 +1,8 @@ import { ApiClient } from "../apiClient/index.js"; import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js"; +import { AnyZodFetchOptions } from "../zodfetch.js"; import { taskContext } from "../task-context-api.js"; -import { StreamInstance } from "./streamInstance.js"; +import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js"; import { RealtimeStreamInstance, RealtimeStreamOperationOptions, @@ -21,8 +22,60 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { abortController: AbortController; }>(); + // Cache of in-flight / resolved `createStream` responses, keyed by + // `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin + // (default 1-day TTL server-side) so reusing them across repeated + // `pipe()` calls for the same `(runId, key)` is safe, and avoids the + // per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`, + // which under chat-agent-style hot-loop writers caused row-lock + // contention on the writer DB. + private createStreamCache = new Map>(); + reset(): void { this.activeStreams.clear(); + this.createStreamCache.clear(); + } + + private getCachedCreateStream( + runId: string, + key: string, + requestOptions: AnyZodFetchOptions | undefined + ): Promise { + const cacheKey = `${runId}:${key}`; + const cached = this.createStreamCache.get(cacheKey); + if (cached) { + return cached; + } + + const promise = this.apiClient.createStream(runId, "self", key, requestOptions); + this.createStreamCache.set(cacheKey, promise); + // Evict on failure so the next call retries instead of returning a + // poisoned cache entry forever. + promise.catch((err) => { + if (this.createStreamCache.get(cacheKey) === promise) { + this.createStreamCache.delete(cacheKey); + } + }); + return promise; + } + + /** + * Reactive invalidation: a writer's `wait()` rejecting can mean the + * cached S2 credentials have gone stale (expired token, revoked + * access, basin retired), so evict the cached `createStream` response + * for `(runId, key)` and let the next `pipe()` re-PUT to mint fresh + * credentials. Compare by identity so a fresh promise installed by a + * concurrent caller isn't accidentally cleared. + */ + private evictCreateStreamIfStale( + runId: string, + key: string, + expected: Promise + ): void { + const cacheKey = `${runId}:${key}`; + if (this.createStreamCache.get(cacheKey) === expected) { + this.createStreamCache.delete(cacheKey); + } } public pipe( @@ -48,6 +101,15 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { ? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal : abortController.signal; + // Capture which cached promise this writer uses so reactive + // invalidation below evicts only if the cache still holds it (a + // concurrent caller may have already refreshed it). + const activeCreatePromise = this.getCachedCreateStream( + runId, + key, + options?.requestOptions + ); + const streamInstance = new StreamInstance({ apiClient: this.apiClient, baseUrl: this.baseUrl, @@ -58,14 +120,29 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { requestOptions: options?.requestOptions, target: options?.target, debug: this.debug, + createStream: () => activeCreatePromise, }); // Register this stream const streamInfo = { wait: () => streamInstance.wait(), abortController }; this.activeStreams.add(streamInfo); - // Clean up when stream completes - streamInstance.wait().finally(() => this.activeStreams.delete(streamInfo)); + // Single internal chain that handles activeStreams cleanup AND + // reactive invalidation. On rejection we evict the cached + // `createStream` entry so the next pipe() for the same `(runId, key)` + // re-PUTs and recovers (e.g. when a cached S2 access token expired + // mid-process). Customer awaiters still observe the rejection via + // the returned `wait()`; this chain just keeps the cleanup path + // from surfacing as unhandled. + streamInstance.wait().then( + () => { + this.activeStreams.delete(streamInfo); + }, + (err) => { + this.evictCreateStreamIfStale(runId, key, activeCreatePromise); + this.activeStreams.delete(streamInfo); + } + ); return { wait: () => streamInstance.wait(), diff --git a/packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts b/packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts index 11eb7290ed..73bec591d9 100644 --- a/packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts @@ -4,6 +4,10 @@ import { AnyZodFetchOptions } from "../zodfetch.js"; import { StreamsWriterV2 } from "./streamsWriterV2.js"; import { StreamsWriter, StreamWriteResult } from "./types.js"; +export type InitializeSessionStreamResponseLike = { + headers?: Record; +}; + export type SessionStreamInstanceOptions = { apiClient: ApiClient; baseUrl: string; @@ -13,6 +17,14 @@ export type SessionStreamInstanceOptions = { signal?: AbortSignal; requestOptions?: AnyZodFetchOptions; debug?: boolean; + /** + * Optional override for the initialize-session-stream call. Defaults to + * `apiClient.initializeSessionStream(sessionId, io, requestOptions)`. The + * channel passes a cached version so repeated `pipe()` / `writer()` + * calls for the same `(sessionId, io)` share a single PUT instead of + * hammering the server on every chunk. + */ + initializeSession?: () => Promise; }; /** @@ -31,11 +43,16 @@ export class SessionStreamInstance implements StreamsWriter { } private async initializeWriter(): Promise> { - const response = await this.options.apiClient.initializeSessionStream( - this.options.sessionId, - this.options.io, - this.options?.requestOptions - ); + const initializeFn = + this.options.initializeSession ?? + (() => + this.options.apiClient.initializeSessionStream( + this.options.sessionId, + this.options.io, + this.options?.requestOptions + )); + + const response = await initializeFn(); const headers = response.headers ?? {}; const accessToken = headers["x-s2-access-token"]; diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 07ee0158bf..e5cd3f84ae 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js"; import { StreamsWriterV2 } from "./streamsWriterV2.js"; import { StreamsWriter, StreamWriteResult } from "./types.js"; +export type CreateStreamResponseLike = { + version: string; + headers?: Record; +}; + export type StreamInstanceOptions = { apiClient: ApiClient; baseUrl: string; @@ -15,6 +20,14 @@ export type StreamInstanceOptions = { requestOptions?: AnyZodFetchOptions; target?: "self" | "parent" | "root" | string; debug?: boolean; + /** + * Optional override for the create-stream call. Defaults to + * `apiClient.createStream(runId, "self", key, requestOptions)`. The + * manager passes a cached version so repeated `pipe()` calls for the + * same `(runId, key)` share a single PUT instead of hammering the + * server on every chunk. + */ + createStream?: () => Promise; }; type StreamsWriterInstance = StreamsWriterV1 | StreamsWriterV2; @@ -27,12 +40,17 @@ export class StreamInstance implements StreamsWriter { } private async initializeWriter(): Promise> { - const { version, headers } = await this.options.apiClient.createStream( - this.options.runId, - "self", - this.options.key, - this.options?.requestOptions - ); + const createStreamFn = + this.options.createStream ?? + (() => + this.options.apiClient.createStream( + this.options.runId, + "self", + this.options.key, + this.options?.requestOptions + )); + + const { version, headers } = await createStreamFn(); const parsedResponse = parseCreateStreamResponse(version, headers); diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 81994a0368..a7ac052b3f 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -77,7 +77,7 @@ import type { ResolvedSkill } from "./skill.js"; // never touches `ai.ts`'s module graph, so the `node:*` builtins // pulled in transitively here never reach a client chunk. import { runBashInSkill, readFileInSkill } from "./agentSkillsRuntime.js"; -import { streams } from "./streams.js"; +import { streams, markChatAgentRunForStreamsWarning } from "./streams.js"; import { sessions, type SessionHandle, @@ -4495,6 +4495,7 @@ function chatCustomAgent< // No client-side upsert needed. locals.set(chatSessionHandleKey, sessions.open(payload.chatId)); locals.set(chatAgentRunContextKey, runOptions.ctx); + markChatAgentRunForStreamsWarning(); taskContext.setConversationId(payload.chatId); stampConversationIdOnActiveSpan(payload.chatId); return userRun(payload, runOptions); @@ -4591,6 +4592,7 @@ function chatAgent< // Mutable holder; advances in `writeTurnCompleteChunk` after each turn // and is the trim target for the NEXT turn's trim record. locals.set(lastTurnCompleteSeqNumKey, { value: undefined }); + markChatAgentRunForStreamsWarning(); taskContext.setConversationId(payload.chatId); // Stamp `gen_ai.conversation.id` on the run-level span. Every diff --git a/packages/trigger-sdk/src/v3/sessions.test.ts b/packages/trigger-sdk/src/v3/sessions.test.ts new file mode 100644 index 0000000000..abeccb0c12 --- /dev/null +++ b/packages/trigger-sdk/src/v3/sessions.test.ts @@ -0,0 +1,186 @@ +import { describe, expect, it, vi } from "vitest"; + +// Per-test override for the stubbed SessionStreamInstance's wait() so a +// test can simulate downstream writer failures (e.g. S2 auth error after +// initializeSessionStream returned a stale token). Reset at the top of +// each test that touches it. +let stubWaitImpl: (() => Promise<{ lastEventId?: string }>) | undefined; + +// Stub `SessionStreamInstance` so constructing a channel writer doesn't try +// to reach S2. The stub still invokes the `initializeSession` callback the +// channel passes in, which is the whole point: that's how the cache gets +// exercised. wait() resolves immediately by default; tests can override it +// via `stubWaitImpl` to verify reactive invalidation on writer failure. +vi.mock("@trigger.dev/core/v3", async (importActual) => { + const actual = (await importActual()) as Record; + class StubSessionStreamInstance { + private waitPromise: Promise<{ lastEventId?: string }>; + constructor(opts: { + source: ReadableStream; + initializeSession?: () => Promise<{ headers?: Record }>; + }) { + // Drain the source so the upstream tee doesn't backpressure-stall. + void (async () => { + const reader = opts.source.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + })(); + // Trigger the initializeSession callback so the cache path runs. + opts.initializeSession?.().catch(() => { + // Failures are observed via the spy; swallow here so unhandled + // rejection warnings don't leak through the stub. + }); + // Capture the wait outcome once at construction (mirrors real + // SessionStreamInstance which kicks off initializeWriter from the + // ctor). All subsequent wait() calls return the same promise so + // a single failure is observable by every consumer in the channel + // (`.finally`, reactive `.catch`, and customer `waitUntilComplete`). + this.waitPromise = stubWaitImpl + ? stubWaitImpl() + : Promise.resolve({ lastEventId: undefined }); + // Claim any rejection so test runs don't surface as unhandled. + // Real awaiters still observe the rejection when they `await` it. + this.waitPromise.catch(() => {}); + } + async wait() { + return this.waitPromise; + } + get stream() { + return new ReadableStream({ start: (c) => c.close() }); + } + } + return { ...actual, SessionStreamInstance: StubSessionStreamInstance }; +}); + +import { SessionOutputChannel } from "./sessions.js"; +import { apiClientManager } from "@trigger.dev/core/v3"; + +type ApiClientStub = { + initializeSessionStream: ReturnType; +}; + +function installStubApiClient(impl: ApiClientStub["initializeSessionStream"]): ApiClientStub { + const stub: ApiClientStub = { initializeSessionStream: impl }; + // `apiClientManager.clientOrThrow()` is what `#pipeInternal` reaches for. + vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue( + stub as unknown as ReturnType + ); + return stub; +} + +function emptyStream(): ReadableStream { + return new ReadableStream({ start: (c) => c.close() }); +} + +describe("SessionOutputChannel initializeSessionStream cache", () => { + it("dedupes repeated pipe()/writer() calls for the same channel", async () => { + stubWaitImpl = undefined; + const spy = vi.fn(async () => ({ version: "v2", headers: {} })); + installStubApiClient(spy); + + const channel = new SessionOutputChannel("session-1"); + const p1 = channel.pipe(emptyStream()); + const p2 = channel.pipe(emptyStream()); + const p3 = channel.writer({ + execute: ({ write }) => { + write({ chunk: 1 }); + }, + }); + + await Promise.all([p1.waitUntilComplete(), p2.waitUntilComplete(), p3.waitUntilComplete()]); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith("session-1", "out", undefined); + }); + + it("evicts on initialize failure so the next call retries instead of returning a poisoned entry", async () => { + stubWaitImpl = undefined; + const spy = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce({ version: "v2", headers: {} }); + installStubApiClient(spy); + + const channel = new SessionOutputChannel("session-1"); + const firstAttempt = channel.pipe(emptyStream()); + // First call fails — the stub swallows the rejection on the + // initializeSession callback, but the cache eviction handler still runs. + await firstAttempt.waitUntilComplete(); + // Settle pending microtasks so the .catch() eviction fires. + await new Promise((resolve) => setTimeout(resolve, 0)); + + const retried = channel.pipe(emptyStream()); + await retried.waitUntilComplete(); + + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("reset() clears cached entries so the next call re-PUTs", async () => { + stubWaitImpl = undefined; + const spy = vi.fn(async () => ({ version: "v2", headers: {} })); + installStubApiClient(spy); + + const channel = new SessionOutputChannel("session-1"); + await channel.pipe(emptyStream()).waitUntilComplete(); + expect(spy).toHaveBeenCalledTimes(1); + + channel.reset(); + + await channel.pipe(emptyStream()).waitUntilComplete(); + expect(spy).toHaveBeenCalledTimes(2); + }); + + it("scopes the cache per channel instance", async () => { + stubWaitImpl = undefined; + const spy = vi.fn(async () => ({ version: "v2", headers: {} })); + installStubApiClient(spy); + + const channelA = new SessionOutputChannel("session-a"); + const channelB = new SessionOutputChannel("session-b"); + + await Promise.all([ + channelA.pipe(emptyStream()).waitUntilComplete(), + channelB.pipe(emptyStream()).waitUntilComplete(), + ]); + + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenCalledWith("session-a", "out", undefined); + expect(spy).toHaveBeenCalledWith("session-b", "out", undefined); + }); + + it("evicts the cache when a writer's wait() rejects (simulated stale-token failure)", async () => { + const spy = vi.fn(async () => ({ version: "v2", headers: {} })); + installStubApiClient(spy); + + // First writer's wait() rejects (e.g. S2 returned 401 after the cached + // token expired mid-process); subsequent writers' wait() resolve cleanly. + let waitCallCount = 0; + stubWaitImpl = async () => { + waitCallCount++; + if (waitCallCount === 1) throw new Error("S2 auth failed: token expired"); + return { lastEventId: undefined }; + }; + + const channel = new SessionOutputChannel("session-1"); + + const failed = channel.pipe(emptyStream()); + await expect(failed.waitUntilComplete()).rejects.toThrow(/token expired/); + + // Settle microtasks so the reactive .catch eviction handler fires. + await new Promise((resolve) => setTimeout(resolve, 0)); + + const recovered = channel.pipe(emptyStream()); + await recovered.waitUntilComplete(); + + // Cache evicted ⇒ second pipe() re-PUT ⇒ two distinct initialize calls. + expect(spy).toHaveBeenCalledTimes(2); + + stubWaitImpl = undefined; + }); +}); diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index 663dbbebc3..18023535f5 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -34,7 +34,11 @@ import { trimSessionStream, writeSessionControlRecord, } from "@trigger.dev/core/v3"; -import type { ControlEvent, StreamWriteResult } from "@trigger.dev/core/v3"; +import type { + ControlEvent, + InitializeSessionStreamResponseLike, + StreamWriteResult, +} from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { SpanStatusCode } from "@opentelemetry/api"; import { tracer } from "./tracer.js"; @@ -266,8 +270,30 @@ export type SessionPipeStreamOptions = Omit; * internally by `pipe`/`writer` — there's no public `initialize()`. */ export class SessionOutputChannel { + // Cache of the in-flight / resolved `initializeSessionStream` PUT for + // this channel. Every `pipe()` / `writer()` call needs the same S2 + // credentials, so we share a single promise instead of re-PUTing on + // every chunk. Hot-loop writers (per-chunk `chat.response.write` / + // direct `session.out.writer` calls) drop from N PUTs to 1 PUT for + // the lifetime of the channel. The S2 access token has a 1-day TTL + // server-side so reusing it across calls within a single run is safe. + // Evicts on failure (so the next call retries) and on `reset()`. + #initPromise?: Promise; + constructor(public readonly sessionId: string) {} + /** + * Drop the cached `initializeSessionStream` response. Surfaces for + * tests and lifecycle hooks that need the next write to re-mint S2 + * credentials. The cache also self-evicts on `initializeSession` + * rejection, so callers don't need to invoke this on failures. + * + * @internal + */ + reset(): void { + this.#initPromise = undefined; + } + /** * Append a single record. Routes through {@link writer} internally so * subscribers receive the same parsed-object shape as multi-record @@ -429,6 +455,28 @@ export class SessionOutputChannel { ? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal : abortController.signal; + // Resolve the init promise eagerly so we can capture which one this + // writer uses for reactive invalidation below. + const writerInitPromise = ((): Promise => { + if (this.#initPromise) { + return this.#initPromise; + } + const fresh = apiClient.initializeSessionStream( + this.sessionId, + "out", + options?.requestOptions + ); + this.#initPromise = fresh; + // Evict on failure so the next call retries instead of returning a + // poisoned cache entry forever. + fresh.catch((err) => { + if (this.#initPromise === fresh) { + this.#initPromise = undefined; + } + }); + return fresh; + })(); + try { const instance = new SessionStreamInstance({ apiClient, @@ -438,11 +486,28 @@ export class SessionOutputChannel { source: readableStreamSource, signal: combinedSignal, requestOptions: options?.requestOptions, + initializeSession: () => writerInitPromise, }); - instance.wait().finally(() => { - span.end(); - }); + // Single internal chain that handles span lifecycle AND reactive + // invalidation. On rejection we evict the cached init promise so + // the next pipe()/writer() re-PUTs and recovers (e.g. when a + // cached S2 access token expired mid-process). Compare by identity + // so a concurrent caller's fresh promise isn't accidentally cleared. + // Customer awaiters still observe the rejection via the returned + // `waitUntilComplete()`; this chain just keeps the cleanup path + // from surfacing as unhandled. + instance.wait().then( + () => { + span.end(); + }, + () => { + if (this.#initPromise === writerInitPromise) { + this.#initPromise = undefined; + } + span.end(); + } + ); return { stream: instance.stream, diff --git a/packages/trigger-sdk/src/v3/streams.ts b/packages/trigger-sdk/src/v3/streams.ts index 6ccaea8891..f987872d80 100644 --- a/packages/trigger-sdk/src/v3/streams.ts +++ b/packages/trigger-sdk/src/v3/streams.ts @@ -19,6 +19,7 @@ import { ManualWaitpointPromise, WaitpointTimeoutError, runtime, + logger, type RealtimeDefinedInputStream, type InputStreamSubscription, type InputStreamOnceOptions, @@ -32,10 +33,43 @@ import { } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { tracer } from "./tracer.js"; +import { locals } from "./locals.js"; import { SpanStatusCode } from "@opentelemetry/api"; const DEFAULT_STREAM_KEY = "default"; +// `chat.agent` sets this once at the top of every run via +// `markChatAgentRunForStreamsWarning`. The flag lives on the run's +// AsyncLocalStorage frame, so it naturally resets between runs and stays +// invisible to subtasks (where `streams.*` is a normal API). +const inChatAgentRunKey = locals.create("streams.inChatAgentRun"); +// Once-per-run dedup. `streams.*` callers inside a chat.agent run get the +// nudge on the first call and silence afterwards; a single tight loop +// won't spam the logs. +const chatAgentStreamsWarnedKey = locals.create("streams.chatAgentWarned"); + +/** + * Marks the current run as a `chat.agent` run so subsequent `streams.pipe` / + * `streams.append` / `streams.read` calls can warn the user that they're + * writing to a run-scoped stream rather than the chat's `session.out`. + * + * Called from inside the `chat.agent` task wrapper at the top of every run. + * + * @internal + */ +export function markChatAgentRunForStreamsWarning(): void { + locals.set(inChatAgentRunKey, true); +} + +function warnIfChatAgentStreamsMisuse(method: "pipe" | "append" | "read" | "writer"): void { + if (!locals.get(inChatAgentRunKey)) return; + if (locals.get(chatAgentStreamsWarnedKey)) return; + locals.set(chatAgentStreamsWarnedKey, true); + logger.warn( + `streams.${method}() was called inside a chat.agent run. This writes to a run-scoped realtime stream and is NOT visible on the chat session, so the chat UI will not see these chunks. For chat output use chat.response.write() or chat.stream.* instead. See https://trigger.dev/docs/ai-chat/patterns/large-payloads. (Logged once per run; subsequent streams.${method}() calls in this run are silent.)` + ); +} + /** * Pipes data to a realtime stream using the default stream key (`"default"`). * @@ -154,6 +188,7 @@ function pipeInternal( opts: PipeStreamOptions | undefined, spanName: string ): PipeStreamResult { + warnIfChatAgentStreamsMisuse(spanName === "streams.writer()" ? "writer" : "pipe"); const runId = getRunIdForOptions(opts); if (!runId) { @@ -325,6 +360,7 @@ async function readStreamImpl( key: string, options?: ReadStreamOptions ): Promise> { + warnIfChatAgentStreamsMisuse("read"); const apiClient = apiClientManager.clientOrThrow(); const span = tracer.startSpan("streams.read()", { @@ -403,6 +439,7 @@ async function appendInternal( part: TPart, options?: AppendStreamOptions ): Promise { + warnIfChatAgentStreamsMisuse("append"); const runId = getRunIdForOptions(options); if (!runId) {