Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/core/src/v3/realtime-streams-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ export const realtimeStreams = RealtimeStreamsAPI.getInstance();

export * from "./realtimeStreams/types.js";
export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js";
export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js";
export type {
SessionStreamInstanceOptions,
InitializeSessionStreamResponseLike,
} from "./realtimeStreams/sessionStreamInstance.js";
export {
trimSessionStream,
writeSessionControlRecord,
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/v3/realtimeStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import {
// `SessionOutputChannel.pipe` / `.writer` can construct it without reaching
// into the core package's internals.
export { SessionStreamInstance } from "./sessionStreamInstance.js";
export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js";
export type {
SessionStreamInstanceOptions,
InitializeSessionStreamResponseLike,
} from "./sessionStreamInstance.js";
export {
trimSessionStream,
writeSessionControlRecord,
Expand Down
147 changes: 147 additions & 0 deletions packages/core/src/v3/realtimeStreams/manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { describe, expect, it, vi } from "vitest";
import type { ApiClient } from "../apiClient/index.js";
import { StandardRealtimeStreamsManager } from "./manager.js";

// The cache lives on a private method to keep `pipe()` callers from having
// to thread cache concerns. Tests exercise it via bracket-notation to keep
// the assertions tight on cache contracts and avoid spinning up real
// `StreamsWriterV1`/`StreamsWriterV2` infrastructure (HTTP requests, S2
// connections) for what is purely an in-memory dedup check.
type GetCached = (
runId: string,
key: string,
requestOptions?: undefined
) => Promise<{ version: string; headers?: Record<string, string> }>;

function getCached(manager: StandardRealtimeStreamsManager, runId: string, key: string) {
return (manager as unknown as { getCachedCreateStream: GetCached }).getCachedCreateStream(
runId,
key
);
}

function makeApiClient(impl: () => Promise<{ version: string; headers?: Record<string, string> }>) {
const spy = vi.fn(impl);
const client = { createStream: spy } as unknown as ApiClient;
return { client, spy };
}

describe("StandardRealtimeStreamsManager createStream cache", () => {
it("dedupes repeated calls for the same (runId, key)", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

const p1 = getCached(manager, "run-1", "chat");
const p2 = getCached(manager, "run-1", "chat");

expect(p1).toBe(p2);
expect(spy).toHaveBeenCalledTimes(1);
await Promise.all([p1, p2]);
expect(spy).toHaveBeenCalledTimes(1);
});

it("issues a separate PUT for each distinct stream key on the same run", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

await Promise.all([
getCached(manager, "run-1", "chat"),
getCached(manager, "run-1", "tool-output"),
]);

expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenNthCalledWith(1, "run-1", "self", "chat", undefined);
expect(spy).toHaveBeenNthCalledWith(2, "run-1", "self", "tool-output", undefined);
});

it("issues a separate PUT for each distinct run, even with the same key", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

await Promise.all([
getCached(manager, "run-1", "chat"),
getCached(manager, "run-2", "chat"),
]);

expect(spy).toHaveBeenCalledTimes(2);
});

it("evicts on failure so the next call retries instead of returning a poisoned entry", async () => {
const spy = vi
.fn()
.mockRejectedValueOnce(new Error("boom"))
.mockResolvedValueOnce({ version: "v1", headers: {} });
const client = { createStream: spy } as unknown as ApiClient;
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

await expect(getCached(manager, "run-1", "chat")).rejects.toThrow("boom");

const retried = await getCached(manager, "run-1", "chat");

expect(retried).toEqual({ version: "v1", headers: {} });
expect(spy).toHaveBeenCalledTimes(2);
});

it("reset() clears cached entries so the next call re-PUTs", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

await getCached(manager, "run-1", "chat");
expect(spy).toHaveBeenCalledTimes(1);

manager.reset();

await getCached(manager, "run-1", "chat");
expect(spy).toHaveBeenCalledTimes(2);
});

it("evictCreateStreamIfStale clears the matching entry so the next call re-PUTs", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

// Prime the cache and capture which promise was stored.
const cachedPromise = getCached(manager, "run-1", "chat");
await cachedPromise;
expect(spy).toHaveBeenCalledTimes(1);

// Simulate the reactive invalidation path that `pipe()` runs when a
// writer's `wait()` rejects.
(
manager as unknown as {
evictCreateStreamIfStale: (
runId: string,
key: string,
expected: Promise<unknown>
) => void;
}
).evictCreateStreamIfStale("run-1", "chat", cachedPromise);

await getCached(manager, "run-1", "chat");
expect(spy).toHaveBeenCalledTimes(2);
});

it("evictCreateStreamIfStale is a no-op when the cache holds a different promise", async () => {
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");

const original = getCached(manager, "run-1", "chat");
await original;

// A different promise (e.g. from a concurrent caller that already
// refreshed) shouldn't trigger eviction.
const stalePromise = Promise.resolve({ version: "v1", headers: {} });
(
manager as unknown as {
evictCreateStreamIfStale: (
runId: string,
key: string,
expected: Promise<unknown>
) => void;
}
).evictCreateStreamIfStale("run-1", "chat", stalePromise);

// Cache should still hold the original entry; next call is a hit.
await getCached(manager, "run-1", "chat");
expect(spy).toHaveBeenCalledTimes(1);
});
});
Comment thread
ericallam marked this conversation as resolved.
83 changes: 80 additions & 3 deletions packages/core/src/v3/realtimeStreams/manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ApiClient } from "../apiClient/index.js";
import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js";
import { AnyZodFetchOptions } from "../zodfetch.js";
import { taskContext } from "../task-context-api.js";
import { StreamInstance } from "./streamInstance.js";
import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js";
import {
RealtimeStreamInstance,
RealtimeStreamOperationOptions,
Expand All @@ -21,8 +22,60 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
abortController: AbortController;
}>();

// Cache of in-flight / resolved `createStream` responses, keyed by
// `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin
// (default 1-day TTL server-side) so reusing them across repeated
// `pipe()` calls for the same `(runId, key)` is safe, and avoids the
// per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`,
// which under chat-agent-style hot-loop writers caused row-lock
// contention on the writer DB.
private createStreamCache = new Map<string, Promise<CreateStreamResponseLike>>();

reset(): void {
this.activeStreams.clear();
this.createStreamCache.clear();
}

private getCachedCreateStream(
runId: string,
key: string,
requestOptions: AnyZodFetchOptions | undefined
): Promise<CreateStreamResponseLike> {
const cacheKey = `${runId}:${key}`;
const cached = this.createStreamCache.get(cacheKey);
if (cached) {
return cached;
}

const promise = this.apiClient.createStream(runId, "self", key, requestOptions);
this.createStreamCache.set(cacheKey, promise);
// Evict on failure so the next call retries instead of returning a
// poisoned cache entry forever.
promise.catch((err) => {
if (this.createStreamCache.get(cacheKey) === promise) {
this.createStreamCache.delete(cacheKey);
}
});
return promise;
}

/**
* Reactive invalidation: a writer's `wait()` rejecting can mean the
* cached S2 credentials have gone stale (expired token, revoked
* access, basin retired), so evict the cached `createStream` response
* for `(runId, key)` and let the next `pipe()` re-PUT to mint fresh
* credentials. Compare by identity so a fresh promise installed by a
* concurrent caller isn't accidentally cleared.
*/
private evictCreateStreamIfStale(
runId: string,
key: string,
expected: Promise<CreateStreamResponseLike>
): void {
const cacheKey = `${runId}:${key}`;
if (this.createStreamCache.get(cacheKey) === expected) {
this.createStreamCache.delete(cacheKey);
}
}

public pipe<T>(
Expand All @@ -48,6 +101,15 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
: abortController.signal;

// Capture which cached promise this writer uses so reactive
// invalidation below evicts only if the cache still holds it (a
// concurrent caller may have already refreshed it).
const activeCreatePromise = this.getCachedCreateStream(
runId,
key,
options?.requestOptions
);

const streamInstance = new StreamInstance({
apiClient: this.apiClient,
baseUrl: this.baseUrl,
Expand All @@ -58,14 +120,29 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
requestOptions: options?.requestOptions,
target: options?.target,
debug: this.debug,
createStream: () => activeCreatePromise,
});

// Register this stream
const streamInfo = { wait: () => streamInstance.wait(), abortController };
this.activeStreams.add(streamInfo);

// Clean up when stream completes
streamInstance.wait().finally(() => this.activeStreams.delete(streamInfo));
// Single internal chain that handles activeStreams cleanup AND
// reactive invalidation. On rejection we evict the cached
// `createStream` entry so the next pipe() for the same `(runId, key)`
// re-PUTs and recovers (e.g. when a cached S2 access token expired
// mid-process). Customer awaiters still observe the rejection via
// the returned `wait()`; this chain just keeps the cleanup path
// from surfacing as unhandled.
streamInstance.wait().then(
() => {
this.activeStreams.delete(streamInfo);
},
(err) => {
this.evictCreateStreamIfStale(runId, key, activeCreatePromise);
this.activeStreams.delete(streamInfo);
}
);

return {
wait: () => streamInstance.wait(),
Expand Down
27 changes: 22 additions & 5 deletions packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { AnyZodFetchOptions } from "../zodfetch.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type InitializeSessionStreamResponseLike = {
headers?: Record<string, string>;
};

export type SessionStreamInstanceOptions<T> = {
apiClient: ApiClient;
baseUrl: string;
Expand All @@ -13,6 +17,14 @@ export type SessionStreamInstanceOptions<T> = {
signal?: AbortSignal;
requestOptions?: AnyZodFetchOptions;
debug?: boolean;
/**
* Optional override for the initialize-session-stream call. Defaults to
* `apiClient.initializeSessionStream(sessionId, io, requestOptions)`. The
* channel passes a cached version so repeated `pipe()` / `writer()`
* calls for the same `(sessionId, io)` share a single PUT instead of
* hammering the server on every chunk.
*/
initializeSession?: () => Promise<InitializeSessionStreamResponseLike>;
};

/**
Expand All @@ -31,11 +43,16 @@ export class SessionStreamInstance<T> implements StreamsWriter {
}

private async initializeWriter(): Promise<StreamsWriterV2<T>> {
const response = await this.options.apiClient.initializeSessionStream(
this.options.sessionId,
this.options.io,
this.options?.requestOptions
);
const initializeFn =
this.options.initializeSession ??
(() =>
this.options.apiClient.initializeSessionStream(
this.options.sessionId,
this.options.io,
this.options?.requestOptions
));

const response = await initializeFn();

const headers = response.headers ?? {};
const accessToken = headers["x-s2-access-token"];
Expand Down
30 changes: 24 additions & 6 deletions packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type CreateStreamResponseLike = {
version: string;
headers?: Record<string, string>;
};

export type StreamInstanceOptions<T> = {
apiClient: ApiClient;
baseUrl: string;
Expand All @@ -15,6 +20,14 @@ export type StreamInstanceOptions<T> = {
requestOptions?: AnyZodFetchOptions;
target?: "self" | "parent" | "root" | string;
debug?: boolean;
/**
* Optional override for the create-stream call. Defaults to
* `apiClient.createStream(runId, "self", key, requestOptions)`. The
* manager passes a cached version so repeated `pipe()` calls for the
* same `(runId, key)` share a single PUT instead of hammering the
* server on every chunk.
*/
createStream?: () => Promise<CreateStreamResponseLike>;
};

type StreamsWriterInstance<T> = StreamsWriterV1<T> | StreamsWriterV2<T>;
Expand All @@ -27,12 +40,17 @@ export class StreamInstance<T> implements StreamsWriter {
}

private async initializeWriter(): Promise<StreamsWriterInstance<T>> {
const { version, headers } = await this.options.apiClient.createStream(
this.options.runId,
"self",
this.options.key,
this.options?.requestOptions
);
const createStreamFn =
this.options.createStream ??
(() =>
this.options.apiClient.createStream(
this.options.runId,
"self",
this.options.key,
this.options?.requestOptions
));

const { version, headers } = await createStreamFn();

const parsedResponse = parseCreateStreamResponse(version, headers);

Expand Down
Loading
Loading