Skip to content

Commit cd1f826

Browse files
committed
abort observed marker
1 parent 5d57fa4 commit cd1f826

6 files changed

Lines changed: 97 additions & 7 deletions

File tree

apps/sim/lib/copilot/request/go/stream.test.ts

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import {
2727
extractEditContent,
2828
runStreamLoop,
2929
} from '@/lib/copilot/request/go/stream'
30-
import { createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
30+
import { AbortReason, createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
3131
import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace'
3232
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types'
3333

@@ -331,6 +331,72 @@ describe('copilot go stream helpers', () => {
331331
).toBe(false)
332332
})
333333

334+
it('invokes onAbortObserved with MarkerObservedAtBodyClose when reclassifying via the abort marker', async () => {
335+
const textEvent = createEvent({
336+
streamId: 'stream-1',
337+
cursor: '1',
338+
seq: 1,
339+
requestId: 'req-1',
340+
type: MothershipStreamV1EventType.text,
341+
payload: {
342+
channel: 'assistant',
343+
text: 'partial response',
344+
},
345+
})
346+
347+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
348+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
349+
350+
const context = createStreamingContext()
351+
const execContext: ExecutionContext = {
352+
userId: 'user-1',
353+
workflowId: 'workflow-1',
354+
}
355+
const onAbortObserved = vi.fn()
356+
357+
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
358+
timeout: 1000,
359+
onAbortObserved,
360+
})
361+
362+
expect(onAbortObserved).toHaveBeenCalledTimes(1)
363+
expect(onAbortObserved).toHaveBeenCalledWith(AbortReason.MarkerObservedAtBodyClose)
364+
expect(context.wasAborted).toBe(true)
365+
})
366+
367+
it('does not invoke onAbortObserved when no abort marker is present at body close', async () => {
368+
const textEvent = createEvent({
369+
streamId: 'stream-1',
370+
cursor: '1',
371+
seq: 1,
372+
requestId: 'req-1',
373+
type: MothershipStreamV1EventType.text,
374+
payload: {
375+
channel: 'assistant',
376+
text: 'partial response',
377+
},
378+
})
379+
380+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
381+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(false)
382+
383+
const context = createStreamingContext()
384+
const execContext: ExecutionContext = {
385+
userId: 'user-1',
386+
workflowId: 'workflow-1',
387+
}
388+
const onAbortObserved = vi.fn()
389+
390+
await expect(
391+
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
392+
timeout: 1000,
393+
onAbortObserved,
394+
})
395+
).rejects.toThrow('Copilot backend stream ended before a terminal event')
396+
397+
expect(onAbortObserved).not.toHaveBeenCalled()
398+
})
399+
334400
it('still fails closed when the body closes without terminal and the abort marker check throws', async () => {
335401
const textEvent = createEvent({
336402
streamId: 'stream-1',

apps/sim/lib/copilot/request/go/stream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
} from '@/lib/copilot/request/handlers/types'
3131
import { getCopilotTracer } from '@/lib/copilot/request/otel'
3232
import {
33+
AbortReason,
3334
eventToStreamEvent,
3435
hasAbortMarker,
3536
isSubagentSpanStreamEvent,
@@ -448,6 +449,7 @@ export async function runStreamLoop(
448449
}
449450

450451
if (abortRequested) {
452+
options.onAbortObserved?.(AbortReason.MarkerObservedAtBodyClose)
451453
context.wasAborted = true
452454
endedOn = CopilotSseCloseReason.Aborted
453455
} else {

apps/sim/lib/copilot/request/lifecycle/headless.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,11 @@ export async function runHeadlessCopilotLifecycle(
5353
simRequestId,
5454
otelContext,
5555
})
56-
outcome =
57-
options.abortSignal?.aborted || result.cancelled
56+
outcome = result.success
57+
? RequestTraceV1Outcome.success
58+
: options.abortSignal?.aborted || result.cancelled
5859
? RequestTraceV1Outcome.cancelled
59-
: result.success
60-
? RequestTraceV1Outcome.success
61-
: RequestTraceV1Outcome.error
60+
: RequestTraceV1Outcome.error
6261
return result
6362
} catch (error) {
6463
outcome = options.abortSignal?.aborted

apps/sim/lib/copilot/request/lifecycle/start.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
249249
onEvent: async (event) => {
250250
await publisher.publish(event)
251251
},
252+
onAbortObserved: (reason) => {
253+
if (!abortController.signal.aborted) {
254+
abortController.abort(reason)
255+
}
256+
},
252257
})
253258

254259
lifecycleResult = result

apps/sim/lib/copilot/request/session/abort-reason.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ export const AbortReason = {
2222
* that the node that DID receive it wrote, and aborts on the poll.
2323
*/
2424
RedisPoller: 'redis_abort_marker:poller',
25+
/**
26+
* Cross-process stop: same root cause as `RedisPoller`, but observed
27+
* by `runStreamLoop` at body close (the Go body ended before the
28+
* 250ms poller's next tick) rather than by the polling timer.
29+
*/
30+
MarkerObservedAtBodyClose: 'redis_abort_marker:body_close',
2531
/** Internal timeout on the outbound explicit-abort fetch to Go. */
2632
ExplicitAbortFetchTimeout: 'timeout:go_explicit_abort_fetch',
2733
} as const
@@ -38,5 +44,9 @@ export type AbortReasonValue = (typeof AbortReason)[keyof typeof AbortReason]
3844
* stops, mirroring `requestctx.IsExplicitUserStop` on the Go side.
3945
*/
4046
export function isExplicitStopReason(reason: unknown): boolean {
41-
return reason === AbortReason.UserStop || reason === AbortReason.RedisPoller
47+
return (
48+
reason === AbortReason.UserStop ||
49+
reason === AbortReason.RedisPoller ||
50+
reason === AbortReason.MarkerObservedAtBodyClose
51+
)
4252
}

apps/sim/lib/copilot/request/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ export interface OrchestratorOptions {
136136
onComplete?: (result: OrchestratorResult) => void | Promise<void>
137137
onError?: (error: Error) => void | Promise<void>
138138
abortSignal?: AbortSignal
139+
/**
140+
* Invoked when the orchestrator infers that the run was aborted via
141+
* an out-of-band signal (currently: a Redis abort marker observed
142+
* at SSE body close). Callers wire this to fire their local
143+
* `AbortController` so `signal.reason` is set and `recordCancelled`
144+
* classifies as `explicit_stop` rather than `unknown`.
145+
*/
146+
onAbortObserved?: (reason: string) => void
139147
interactive?: boolean
140148
}
141149

0 commit comments

Comments
 (0)