diff --git a/.changeset/chained-approvals-fix.md b/.changeset/chained-approvals-fix.md new file mode 100644 index 000000000..e56d7387e --- /dev/null +++ b/.changeset/chained-approvals-fix.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai-client': patch +--- + +Fix chained tool approval flows where a second approval arriving during an active continuation stream was silently dropped diff --git a/docs/architecture/approval-flow-processing.md b/docs/architecture/approval-flow-processing.md new file mode 100644 index 000000000..5f033a406 --- /dev/null +++ b/docs/architecture/approval-flow-processing.md @@ -0,0 +1,424 @@ +# Approval Flow Processing Architecture + +> Internal architecture reference for the tool approval system in TanStack AI. +> Covers the full lifecycle from stream event to continuation, with emphasis on +> concurrency control and the chained approval mechanism. + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [Component Responsibilities](#component-responsibilities) +3. [Type System](#type-system) +4. [State Machine](#state-machine) +5. [Single Approval Lifecycle](#single-approval-lifecycle) +6. [Chained Approvals and Continuation Control](#chained-approvals-and-continuation-control) +7. [Stream Event Protocol](#stream-event-protocol) +8. [Key Source Files](#key-source-files) + +--- + +## Overview + +The approval flow allows tools marked with `needsApproval: true` to pause +execution until the user explicitly approves or denies the action. This +creates a human-in-the-loop checkpoint for sensitive operations (sending +emails, making purchases, deleting data). + +The flow spans three layers: + +```mermaid +flowchart TD + A[Server - TextEngine] + B[StreamProcessor] + C[ChatClient] + + A -- "AG-UI stream · SSE / HTTP" --> B + B -- "events" --> C +``` + +| Layer | Responsibility | +|-------|----------------| +| **Server (TextEngine)** | `chat()` detects `needsApproval` → emits CUSTOM `approval-requested` instead of executing the tool | +| **StreamProcessor** | Receives CUSTOM chunk → `updateToolCallApproval()` → fires `onApprovalRequest` callback | +| **ChatClient** | Exposes `addToolApprovalResponse()` → updates message state → triggers `checkForContinuation()` → sends new stream | + +Framework hooks (`useChat` in React, Solid, Vue, Svelte) delegate to +`ChatClient`, which owns all concurrency and continuation logic. + +--- + +## Component Responsibilities + +### TextEngine (server) + +- Runs the agent loop: calls the LLM adapter, accumulates tool calls, executes + tools, and re-invokes the adapter with results. +- When a tool has `needsApproval: true`, the engine emits a + `CUSTOM { name: "approval-requested" }` event instead of executing the tool. +- The stream ends with `RUN_FINISHED { finishReason: "tool_calls" }` so the + client knows tools are pending. + +### StreamProcessor (`packages/typescript/ai/src/activities/chat/stream/processor.ts`) + +- Single source of truth for `UIMessage[]` state. +- On `approval-requested` custom event: + 1. Calls `updateToolCallApproval()` to set the tool-call part's state to + `approval-requested` and attach approval metadata. + 2. Fires `onApprovalRequest` so the ChatClient can emit devtools events. +- On `addToolApprovalResponse(approvalId, approved)`: + 1. Calls `updateToolCallApprovalResponse()` to set state to + `approval-responded` and record `approval.approved`. +- Provides `areAllToolsComplete()` which the ChatClient uses to decide whether + to auto-continue the conversation. + +### ChatClient (`packages/typescript/ai-client/src/chat-client.ts`) + +- Owns the streaming lifecycle (`streamResponse`), post-stream action queue, + and continuation control flags. +- Exposes `addToolApprovalResponse()` as the public API for responding to + approval requests. +- Manages two critical flags for continuation: + - `continuationPending` — prevents concurrent `streamResponse` calls. + - `continuationSkipped` — detects when a queued continuation check was + suppressed by `continuationPending` and needs re-evaluation. + +### Framework Hooks (React `useChat`, Solid `useChat`, etc.) + +- Wrap `ChatClient` methods in framework-specific reactive primitives. +- Expose `addToolApprovalResponse` directly to the component. +- No approval-specific logic beyond delegation. + +--- + +## Type System + +### ToolCallState + +```typescript +type ToolCallState = + | 'awaiting-input' // TOOL_CALL_START received, no arguments yet + | 'input-streaming' // Partial arguments being received + | 'input-complete' // All arguments received (TOOL_CALL_END) + | 'approval-requested' // Waiting for user approval + | 'approval-responded' // User has approved or denied +``` + +### ToolCallPart (approval-relevant fields) + +```typescript +interface ToolCallPart { + type: 'tool-call' + id: string // Unique tool call ID + name: string // Tool name + arguments: string // JSON string of arguments + state: ToolCallState + + approval?: { + id: string // Unique approval ID (NOT the toolCallId) + needsApproval: boolean // Always true when present + approved?: boolean // undefined until user responds + } + + output?: any // Set after execution (client tools) +} +``` + +### Key distinction: approval ID vs tool call ID + +The `approval.id` is a separate identifier generated per approval request. +All user-facing APIs (`addToolApprovalResponse`) use the **approval ID**, +not the tool call ID. This allows the system to correlate approval responses +even when multiple tools share similar call IDs across different messages. + +--- + +## State Machine + +```mermaid +flowchart TD + S1[awaiting-input] + S2[input-streaming] + S3[input-complete] + S4[approval-requested] + S5([execute directly]) + S6[approval-responded] + S7([execute tool]) + S8([cancelled]) + + S1 -- "TOOL_CALL_START" --> S2 + S2 -- "TOOL_CALL_ARGS" --> S3 + S3 -- "TOOL_CALL_END · needsApproval: true" --> S4 + S3 -- "TOOL_CALL_END · needsApproval: false" --> S5 + S4 -- "addToolApprovalResponse()" --> S6 + S6 -- "approved: true" --> S7 + S6 -- "approved: false" --> S8 +``` + +### Terminal states for `areAllToolsComplete()` + +A tool call is considered complete (and eligible for auto-continuation) when +any of the following is true: + +1. `state === 'approval-responded'` — user approved or denied +2. `output !== undefined && !approval` — client tool finished (no approval flow) +3. A corresponding `tool-result` part exists — server tool finished + +--- + +## Single Approval Lifecycle + +Step-by-step flow for a single tool requiring approval: + +### 1. Server emits approval request during stream + +``` +TOOL_CALL_START { toolCallId: "tc-1", toolName: "send_email" } +TOOL_CALL_ARGS { toolCallId: "tc-1", delta: '{"to":"..."}' } +TOOL_CALL_END { toolCallId: "tc-1" } +CUSTOM { name: "approval-requested", value: { + toolCallId: "tc-1", + toolName: "send_email", + input: { to: "..." }, + approval: { id: "appr-1", needsApproval: true } + }} +RUN_FINISHED { finishReason: "tool_calls" } +``` + +### 2. StreamProcessor processes the CUSTOM chunk + +``` +handleCustomEvent(): + 1. updateToolCallApproval(messages, messageId, "tc-1", "appr-1") + → Sets part.state = "approval-requested" + → Sets part.approval = { id: "appr-1", needsApproval: true } + 2. emitMessagesChange() + 3. fires onApprovalRequest({ toolCallId, toolName, input, approvalId }) +``` + +### 3. Stream ends, ChatClient processes + +``` +streamResponse() finally block: + 1. setIsLoading(false) + 2. drainPostStreamActions() → (nothing queued) + 3. streamCompletedSuccessfully check: + lastPart is tool-call (not tool-result) → no auto-continue + → Returns to caller (sendMessage resolves) +``` + +The conversation is now paused. The UI renders the approval prompt. + +### 4. User approves + +``` +addToolApprovalResponse({ id: "appr-1", approved: true }): + 1. processor.addToolApprovalResponse("appr-1", true) + → updateToolCallApprovalResponse(): + part.approval.approved = true + part.state = "approval-responded" + 2. isLoading is false → call checkForContinuation() directly +``` + +### 5. Continuation + +``` +checkForContinuation(): + 1. continuationPending = false, isLoading = false → proceed + 2. shouldAutoSend() → areAllToolsComplete(): + part.state === "approval-responded" → true + 3. continuationPending = true + 4. streamResponse() → new stream to server with approval in messages + 5. Server sees approval, executes tool, returns result + LLM response + 6. continuationPending = false +``` + +--- + +## Chained Approvals and Continuation Control + +The most complex scenario: a continuation stream produces **another** tool call +that also needs approval, and the user responds to it while the stream is still +active. + +### The Problem + +``` +Timeline: +───────────────────────────────────────────────────────────── + +1. User approves tool A + └─ checkForContinuation() sets continuationPending = true + └─ streamResponse() starts (stream 2) + +2. Stream 2 produces tool B needing approval + └─ approval-requested chunk processed + └─ UI shows approval prompt for tool B + +3. User approves tool B WHILE stream 2 is still active + └─ addToolApprovalResponse(): + └─ processor state updated (approval-responded) + └─ isLoading is true → queues checkForContinuation + +4. Stream 2 ends + └─ streamResponse() finally block: + └─ setIsLoading(false) + └─ drainPostStreamActions(): + └─ Runs queued checkForContinuation() + └─ BUT continuationPending is STILL TRUE (from step 1) + └─ *** EARLY RETURN — approval swallowed *** + └─ Returns to step 1's checkForContinuation() + └─ continuationPending = false + +5. Nobody re-checks → tool B's approval is lost +``` + +### The Solution: `continuationSkipped` Flag + +Two flags work together to handle this: + +- **`continuationPending`** — prevents concurrent `streamResponse()` calls. + Set to `true` when entering `checkForContinuation`'s streaming path, cleared + in the `finally` block. + +- **`continuationSkipped`** — set to `true` whenever `checkForContinuation()` + returns early due to `continuationPending` or `isLoading` being true. + Checked after `continuationPending` is cleared to trigger a re-evaluation. + +```typescript +private async checkForContinuation(): Promise { + if (this.continuationPending || this.isLoading) { + this.continuationSkipped = true // ← Mark that a check was suppressed + return + } + + if (this.shouldAutoSend()) { + this.continuationPending = true + this.continuationSkipped = false // ← Reset before entering stream + try { + await this.streamResponse() + } finally { + this.continuationPending = false + } + // If a check was skipped during the stream, re-evaluate now + if (this.continuationSkipped) { + this.continuationSkipped = false + await this.checkForContinuation() // ← Recurse safely + } + } +} +``` + +### Why the recursion is safe + +The recursion terminates because: + +1. **`continuationSkipped` is only set when a real check was suppressed.** After + the final stream (e.g., a text-only response), no new approvals arrive, so + `continuationSkipped` stays `false` and the recursion stops. + +2. **`shouldAutoSend()` returns `false` when tools are still pending approval.** + If a new approval arrives that hasn't been responded to yet, `areAllToolsComplete()` + returns `false` and the method exits without streaming. + +3. **Each recursion level sets `continuationPending = true`**, preventing any + concurrent checks from entering the streaming path. + +### Corrected Timeline + +``` +Timeline (with fix): +───────────────────────────────────────────────────────────── + +1. User approves tool A + └─ checkForContinuation() [OUTER] + └─ continuationPending = true, continuationSkipped = false + └─ streamResponse() starts (stream 2) + +2. Stream 2 produces tool B, user approves during stream + └─ Queues checkForContinuation as post-stream action + +3. Stream 2 ends + └─ drainPostStreamActions(): + └─ checkForContinuation(): continuationPending is true + └─ continuationSkipped = true ← MARKED + └─ returns early + └─ Back in OUTER: continuationPending = false + +4. OUTER checks continuationSkipped → true + └─ continuationSkipped = false + └─ Recurses into checkForContinuation() [INNER] + └─ shouldAutoSend() → true (tool B is approval-responded) + └─ continuationPending = true + └─ streamResponse() → stream 3 (final text response) + └─ continuationPending = false + └─ continuationSkipped is false → no further recursion + +5. Done. All three streams completed correctly. +``` + +--- + +## Stream Event Protocol + +### Approval-related AG-UI events + +These are `CUSTOM` events emitted by the TextEngine, not by adapters directly. + +#### `approval-requested` + +Emitted when a tool with `needsApproval: true` has its arguments finalized. + +```typescript +{ + type: 'CUSTOM', + name: 'approval-requested', + value: { + toolCallId: string, // ID of the tool call + toolName: string, // Name of the tool + input: any, // Parsed arguments + approval: { + id: string, // Unique approval ID + needsApproval: true + } + } +} +``` + +**Processor handling:** `handleCustomEvent()` → `updateToolCallApproval()` → +`onApprovalRequest` callback. + +#### Relation to other tool events + +A complete approval tool call in the stream looks like: + +``` +TOOL_CALL_START → creates tool-call part (state: awaiting-input) +TOOL_CALL_ARGS* → accumulates arguments (state: input-streaming) +TOOL_CALL_END → finalizes arguments (state: input-complete) +CUSTOM → approval-requested (state: approval-requested) +RUN_FINISHED → finishReason: "tool_calls" +``` + +After the stream ends and the user responds, the ChatClient: +1. Updates the tool-call part (state: `approval-responded`) +2. Sends a new stream request with the full conversation (including approval) +3. The server sees the approval and either executes or cancels the tool + +--- + +## Key Source Files + +| File | Role | +|------|------| +| `packages/typescript/ai/src/types.ts` | `ToolCallState`, `ToolCallPart`, tool approval types | +| `packages/typescript/ai/src/activities/chat/stream/processor.ts` | `handleCustomEvent()` (approval-requested), `areAllToolsComplete()`, `addToolApprovalResponse()` | +| `packages/typescript/ai/src/activities/chat/stream/message-updaters.ts` | `updateToolCallApproval()`, `updateToolCallApprovalResponse()` | +| `packages/typescript/ai-client/src/chat-client.ts` | `addToolApprovalResponse()`, `checkForContinuation()`, continuation flags | +| `packages/typescript/ai-react/src/use-chat.ts` | React hook: exposes `addToolApprovalResponse` | +| `packages/typescript/ai-solid/src/use-chat.ts` | Solid hook: exposes `addToolApprovalResponse` | +| `packages/typescript/ai-vue/src/use-chat.ts` | Vue composable: exposes `addToolApprovalResponse` | +| `packages/typescript/ai-svelte/src/create-chat.svelte.ts` | Svelte: exposes `addToolApprovalResponse` | +| `packages/typescript/ai-client/tests/chat-client.test.ts` | Chained approval test (`describe('chained tool approvals')`) | +| `packages/typescript/ai/docs/chat-architecture.md` | Internal stream processing architecture | diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 27078d5e8..3cb0f4f7d 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -40,6 +40,9 @@ export class ChatClient { private pendingToolExecutions: Map> = new Map() // Flag to deduplicate continuation checks during action draining private continuationPending = false + // Tracks whether a queued checkForContinuation was skipped because + // continuationPending was true (chained approval scenario) + private continuationSkipped = false private callbacksRef: { current: { @@ -404,7 +407,9 @@ export class ChatClient { // If stream is in progress, queue the response for after it ends if (this.isLoading) { - this.queuePostStreamAction(() => this.streamResponse()) + this.queuePostStreamAction(async () => { + await this.streamResponse() + }) return } @@ -412,12 +417,13 @@ export class ChatClient { } /** - * Stream a response from the LLM + * Stream a response from the LLM. + * Returns true if the stream completed successfully, false on abort or error. */ - private async streamResponse(): Promise { + private async streamResponse(): Promise { // Guard against concurrent streams - if already loading, skip if (this.isLoading) { - return + return false } this.setIsLoading(true) @@ -458,7 +464,7 @@ export class ChatClient { } catch (err) { if (err instanceof Error) { if (err.name === 'AbortError') { - return + return false } this.setError(err) this.setStatus('error') @@ -486,6 +492,8 @@ export class ChatClient { } } } + + return streamCompletedSuccessfully } /** @@ -631,16 +639,28 @@ export class ChatClient { private async checkForContinuation(): Promise { // Prevent duplicate continuation attempts if (this.continuationPending || this.isLoading) { + this.continuationSkipped = true return } if (this.shouldAutoSend()) { this.continuationPending = true + this.continuationSkipped = false + let succeeded = false try { - await this.streamResponse() + succeeded = await this.streamResponse() } finally { this.continuationPending = false } + // If a queued check was skipped while continuationPending was true + // (e.g. a chained approval responded to during the stream), re-evaluate + // now that the flag is cleared. Only replay after a successful stream — + // aborted or errored streams should not trigger further continuation. + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- mutated asynchronously during await + if (this.continuationSkipped && succeeded) { + this.continuationSkipped = false + await this.checkForContinuation() + } } } diff --git a/packages/typescript/ai-client/tests/chat-client.test.ts b/packages/typescript/ai-client/tests/chat-client.test.ts index 5b9bf956d..add1d730c 100644 --- a/packages/typescript/ai-client/tests/chat-client.test.ts +++ b/packages/typescript/ai-client/tests/chat-client.test.ts @@ -5,8 +5,11 @@ import { createTextChunks, createThinkingChunks, createToolCallChunks, + createApprovalToolCallChunks, createCustomEventChunks, } from './test-utils' +import type { ConnectionAdapter } from '../src/connection-adapters' +import type { StreamChunk } from '@tanstack/ai' import type { UIMessage } from '../src/types' describe('ChatClient', () => { @@ -1096,4 +1099,129 @@ describe('ChatClient', () => { expect(actualData.null_value).toBeNull() }) }) + + describe('chained tool approvals', () => { + it('should continue after second approval arrives during active continuation stream', async () => { + let streamCount = 0 + let resolveStreamPause: (() => void) | null = null + + const adapter: ConnectionAdapter = { + async *connect() { + streamCount++ + + if (streamCount === 1) { + // First stream: tool call A needing approval + const chunks = createApprovalToolCallChunks([ + { + id: 'tc-1', + name: 'dangerous_tool_1', + arguments: '{}', + approvalId: 'approval-1', + }, + ]) + for (const chunk of chunks) yield chunk + } else if (streamCount === 2) { + // Second stream (after first approval): tool call B needing approval + // Yield the tool call and approval request + const preChunks: Array = [ + { + type: 'TOOL_CALL_START', + toolCallId: 'tc-2', + toolName: 'dangerous_tool_2', + model: 'test', + timestamp: Date.now(), + index: 0, + }, + { + type: 'TOOL_CALL_ARGS', + toolCallId: 'tc-2', + model: 'test', + timestamp: Date.now(), + delta: '{}', + }, + { + type: 'TOOL_CALL_END', + toolCallId: 'tc-2', + toolName: 'dangerous_tool_2', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'CUSTOM', + model: 'test', + timestamp: Date.now(), + name: 'approval-requested', + value: { + toolCallId: 'tc-2', + toolName: 'dangerous_tool_2', + input: {}, + approval: { id: 'approval-2', needsApproval: true }, + }, + }, + ] + for (const chunk of preChunks) yield chunk + + // Pause stream so the test can approve tool B while stream is active + await new Promise((resolve) => { + resolveStreamPause = resolve + }) + + yield { + type: 'RUN_FINISHED' as const, + runId: 'run-2', + model: 'test', + timestamp: Date.now(), + finishReason: 'tool_calls' as const, + } + } else if (streamCount === 3) { + // Third stream (after second approval): final text response + const chunks = createTextChunks('All done!') + for (const chunk of chunks) yield chunk + } + }, + } + + const client = new ChatClient({ connection: adapter }) + + // Step 1: Send message. First stream produces tool A with approval. + await client.sendMessage('Do something dangerous') + expect(streamCount).toBe(1) + + // Step 2: Approve tool A. This triggers checkForContinuation → streamResponse (stream 2). + // Don't await - we need to interact during the stream. + const approvalPromise = client.addToolApprovalResponse({ + id: 'approval-1', + approved: true, + }) + + // Wait for second stream to pause (approval-requested chunk already processed) + await vi.waitFor(() => { + expect(resolveStreamPause).not.toBeNull() + }) + + // Step 3: Approve tool B while second stream is still active (isLoading=true) + expect(client.getIsLoading()).toBe(true) + await client.addToolApprovalResponse({ + id: 'approval-2', + approved: true, + }) + + // Resume the second stream + resolveStreamPause!() + + // Wait for the full chain to complete + await approvalPromise + + // Step 4: Verify all 3 streams fired (the second approval triggered stream 3) + expect(streamCount).toBe(3) + + // Verify final text response is present + const messages = client.getMessages() + const lastAssistant = [...messages] + .reverse() + .find((m) => m.role === 'assistant') + const textPart = lastAssistant?.parts.find((p) => p.type === 'text') + expect(textPart?.content).toBe('All done!') + }) + }) }) diff --git a/packages/typescript/ai-client/tests/test-utils.ts b/packages/typescript/ai-client/tests/test-utils.ts index fa736d04b..dea8d4643 100644 --- a/packages/typescript/ai-client/tests/test-utils.ts +++ b/packages/typescript/ai-client/tests/test-utils.ts @@ -241,6 +241,76 @@ export function createToolCallChunks( return chunks } +/** + * Helper to create tool call chunks with approval requests (AG-UI format) + * Tools will be in 'input-complete' state with pending approval + */ +export function createApprovalToolCallChunks( + toolCalls: Array<{ + id: string + name: string + arguments: string + approvalId: string + }>, + messageId: string = 'msg-1', + model: string = 'test', +): Array { + const chunks: Array = [] + const runId = `run-${messageId}` + + for (let i = 0; i < toolCalls.length; i++) { + const toolCall = toolCalls[i]! + + chunks.push({ + type: 'TOOL_CALL_START', + toolCallId: toolCall.id, + toolName: toolCall.name, + model, + timestamp: Date.now(), + index: i, + }) + + chunks.push({ + type: 'TOOL_CALL_ARGS', + toolCallId: toolCall.id, + model, + timestamp: Date.now(), + delta: toolCall.arguments, + }) + + chunks.push({ + type: 'TOOL_CALL_END', + toolCallId: toolCall.id, + toolName: toolCall.name, + model, + timestamp: Date.now(), + }) + + chunks.push({ + type: 'CUSTOM', + model, + timestamp: Date.now(), + name: 'approval-requested', + value: { + toolCallId: toolCall.id, + toolName: toolCall.name, + input: JSON.parse(toolCall.arguments), + approval: { id: toolCall.approvalId, needsApproval: true }, + }, + }) + } + + chunks.push({ + type: 'RUN_FINISHED', + runId, + model, + timestamp: Date.now(), + finishReason: 'tool_calls', + }) + + return chunks +} + /** * Helper to create thinking chunks (AG-UI format using STEP_FINISHED for thinking) */