diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..4132a30 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,11 @@ +# agent-runtime Agent Bootloader + +Read `CLAUDE.md` first. This repo keeps provider-specific entry files short: + +- `CLAUDE.md`: repo orientation, code map, layering, commands, and local deltas. +- `docs/BUILDING.md`: stable building discipline. +- `docs/ANTI_PATTERNS.md`: named failure modes and stop signs. +- `.evolve/current.json` and `memory/`: live state and evidence ledger. + +Do not duplicate long-lived process rules here. Add durable rules to the docs +above and keep this file as the provider-neutral pointer. diff --git a/CLAUDE.md b/CLAUDE.md index 7a55254..4d7a287 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -56,12 +56,7 @@ This repo is the empirical home of the RSI/learning-flywheel thesis, but **mecha **The live science state — every number, what's proven/disproven, the current goal — lives in `.evolve/current.json` + the `memory/` evidence ledger. Read them; do not mirror them here.** `docs/eval-substrate.md` holds the north star (the RSI runtime + its eval substrate) and the measurement non-negotiables. -**Process discipline (the anti-patterns that have bitten this repo):** -- **Don't build mechanism ahead of the gate.** Per-branch adaptive sub-agents, learned planners, the outer flywheel — all wait for a *positive* gate result. Expressiveness was the closed gap; the open one is evidentiary. -- **Don't re-run a settled measurement.** The instrument already returned 0 coding-headroom (3 runs) and steering-loses on FinSearchComp. Read the dated controlled-result memory note before proposing to "test if steering helps" again. -- **Estimate cost before launch.** cells × per-cell-time / concurrency. A cell is a multi-min rollout; GEPA multiplies it (POP×GENS×cells). FinSearchComp-over-sandbox ≈ 3hr/run with ~14% stream-drop loss — budget it or use the offline corpus / local gate (conc≤2). -- **Confounds before causal claims.** Never claim a win where treatment got more compute than control. Isolate via refine@k vs random@k at EQUAL k; exclude infra-errored cells; report the discordant count; apply BH across arms; prefer deterministic-judge domains. Run the cheapest decisive check first. -- **No overclaim.** "Validates the concept" ≠ "validates the product." Route through the real kernel (`runLoop` + `createDriver` + judge-as-`Validator`) to claim the product. Underpowered splits (n≈20) are not wins. A confounded "steering proven" (treatment got more compute than control) is a cautionary precedent — see the memory ledger. +**Process discipline:** stable build rules live in `docs/BUILDING.md`; named failure modes live in `docs/ANTI_PATTERNS.md`. `CLAUDE.md` is the bootloader, not the whole policy manual. ## Memory discipline diff --git a/bench/src/fleet.mts b/bench/src/fleet.mts new file mode 100644 index 0000000..0e8b1ee --- /dev/null +++ b/bench/src/fleet.mts @@ -0,0 +1,122 @@ +/** + * The whole vision, end to end, runnable from a laptop: a thin local driver + * fans out N workers to CLOUD sandboxes, observes each worker's trace, reports + * what to fix, and writes durable learnings to a corpus the NEXT run reads back. + * + * Local process = the driver (orchestrate + observe). All agent work runs in the + * cloud (Tangle sandbox SDK). Scale is the API key, not the laptop. + * + * dotenvx run -f ~/company/devops/secrets/.env.keys -f ~/company/devops/secrets/agent-state.env -- \ + * env BACKEND=opencode MODEL=gpt-4.1 N=2 CORPUS=/tmp/fleet-corpus.jsonl \ + * pnpm exec tsx src/fleet.mts + * + * Run it twice: the second run injects the first run's learnings into the workers. + */ +import { createChatClient } from '@tangle-network/agent-eval' +import { FileCorpus, observe, openSandboxRun, renderReport } from '@tangle-network/agent-runtime/loops' +import { Sandbox } from '@tangle-network/sandbox' +import { answerOutput, sandboxAgentRun, type WorkerBackendType } from './experiment' + +function env(name: string, fallback?: string): string { + const v = process.env[name] ?? fallback + if (v === undefined) throw new Error(`missing env ${name}`) + return v +} + +// The fleet's work: each worker gets one subtask. Swap for any real decomposition. +const subtasks = [ + 'Write a Python function `is_prime(n)` and three asserts proving it. Run them and report PASS/FAIL.', + 'Write a Python function `fib(n)` (iterative) and three asserts proving it. Run them and report PASS/FAIL.', + 'Write a Python function `rev_words(s)` that reverses word order, with asserts. Run them and report PASS/FAIL.', +] + +interface WorkerResult { + id: string + task: string + output: string + events: unknown[] + wallMs: number + error?: string +} + +async function runWorker( + client: Sandbox, + cfg: { backendType: WorkerBackendType; model: string; routerBaseUrl: string; routerKey: string }, + id: string, + task: string, + priorLearnings: string, +): Promise { + const startedAt = Date.now() + const prompt = priorLearnings ? `${priorLearnings}\n\n---\n\n${task}` : task + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), Number(process.env.TIMEOUT_MS ?? 240_000)) + try { + const agentRun = sandboxAgentRun({ ...cfg, name: id }) + const run = await openSandboxRun( + client, + { agentRun, signal: controller.signal }, + { kind: 'events', fromEvents: (events) => answerOutput.parse(events as never) }, + ) + try { + const turn = await run.start(prompt) + return { id, task, output: (turn.out ?? '').trim(), events: turn.events, wallMs: Date.now() - startedAt } + } finally { + await run.close().catch(() => {}) + } + } catch (err) { + return { id, task, output: '', events: [], wallMs: Date.now() - startedAt, error: err instanceof Error ? err.message : String(err) } + } finally { + clearTimeout(timer) + } +} + +async function main(): Promise { + const routerKey = env('TANGLE_API_KEY') + const cfg = { + backendType: env('BACKEND', 'opencode') as WorkerBackendType, + model: env('MODEL', 'gpt-4.1'), + routerBaseUrl: env('ROUTER_BASE_URL', 'https://router.tangle.tools/v1'), + routerKey, + } + const n = Math.min(Number(env('N', '2')), subtasks.length) + const corpus = new FileCorpus(env('CORPUS', '/tmp/fleet-corpus.jsonl')) + const observerModel = env('OBSERVER_MODEL', 'gpt-4.1') + const chat = createChatClient({ transport: 'router', apiKey: routerKey, baseUrl: cfg.routerBaseUrl, defaultModel: observerModel }) + const client = new Sandbox({ baseUrl: env('SANDBOX_BASE_URL', 'https://sandbox.tangle.tools'), apiKey: routerKey }) + + // ── continuous: read what prior runs LEARNED, inject it into this run's workers + const prior = await corpus.query({ tags: ['audience:agent'], limit: 8 }) + const priorLearnings = prior.length + ? `PRIOR LEARNINGS (from earlier runs — apply them):\n${prior.map((r) => `- ${r.claim}`).join('\n')}` + : '' + console.error(`\n=== FLEET · ${n} workers · ${cfg.backendType}/${cfg.model} · cloud ===`) + console.error(prior.length ? `carrying ${prior.length} prior learning(s) into the workers\n` : 'first run — no prior learnings yet\n') + + // ── fan out N workers to cloud sandboxes, in parallel + const tasks = subtasks.slice(0, n) + const workers = await Promise.all( + tasks.map((task, i) => runWorker(client, cfg, `worker-${i + 1}`, task, priorLearnings)), + ) + + // ── observe each worker's trace → findings → operator report + durable learnings + let totalLearned = 0 + for (const w of workers) { + console.error(`\n── ${w.id} (${Math.round(w.wallMs / 1000)}s)${w.error ? ` — ERROR: ${w.error}` : ''}`) + if (w.error) continue + const ob = await observe( + { task: w.task, output: w.output, trace: w.events, outcome: w.output ? 'passed' : 'unknown', runId: w.id }, + { chat, model: observerModel, corpus, tags: [cfg.backendType, 'fleet'] }, + ) + totalLearned += ob.learned.length + console.error(` answer: ${w.output.slice(0, 120).replace(/\n/g, ' ')}`) + console.error(renderReport(ob.findings).split('\n').map((l) => ` ${l}`).join('\n')) + console.error(` → ${ob.learned.length} new learning(s) saved to the corpus`) + } + + console.error(`\n=== fleet done: ${workers.filter((w) => !w.error).length}/${n} workers ok · ${totalLearned} learnings banked → run again to apply them ===`) +} + +main().catch((e) => { + console.error(e) + process.exit(1) +}) diff --git a/bench/src/observe-steer-workspace-loop.mts b/bench/src/observe-steer-workspace-loop.mts new file mode 100644 index 0000000..333d18b --- /dev/null +++ b/bench/src/observe-steer-workspace-loop.mts @@ -0,0 +1,408 @@ +/** + * Bare closed-loop proof: + * Scope.spawn -> git workspace worker -> observe(trace) -> steer_worker/Scope.send -> fix worker + * + * This intentionally avoids a `defineLoop` facade. The driver uses the same + * coordination MCP verbs a sandbox driver would see, but runs locally so the + * join is reproducible without cloud credentials. + * + * pnpm exec tsx bench/src/observe-steer-workspace-loop.mts + */ +import { execFileSync } from 'node:child_process' +import { mkdtempSync, rmSync, writeFileSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { createChatClient, type AnalystFinding } from '@tangle-network/agent-eval' +import type { AgentProfile } from '@tangle-network/sandbox' +import { createCoordinationTools } from '../../src/mcp/tools/coordination' +import { + InMemoryResultBlobStore, + InMemorySpawnJournal, + contentAddress, + createExecutorRegistry, + createSupervisor, + gitWorkspace, + observe, + type Agent, + type AgentSpec, + type Executor, + type ExecutorResult, + type ResultBlobStore, + type Scope, + type Spend, + type Workspace, +} from '../../src/runtime/index' + +const bareRepo = '/tmp/observe-steer-workspace-loop.git' + +type WorkerPhase = 'initial' | 'fix' + +interface WorkerOutput { + readonly kind: 'worker' + readonly phase: WorkerPhase + readonly ok: boolean + readonly output: string + readonly trace: ReadonlyArray + readonly rev?: string + readonly steer?: string +} + +interface DriverOutput { + readonly kind: 'driver' + readonly ok: boolean + readonly initialWorker: string + readonly fixWorker: string + readonly finding: string + readonly finalRev: string + readonly integration: string +} + +type TreeOutput = WorkerOutput | DriverOutput + +const zeroSpend = (ms: number): Spend => ({ + iterations: 1, + tokens: { input: 0, output: 0 }, + usd: 0, + ms, +}) + +const sh = (cmd: string, args: string[], cwd?: string): string => + execFileSync(cmd, args, { cwd, encoding: 'utf-8', stdio: 'pipe' }) + +const git = (args: string[], cwd?: string): string => + sh( + 'git', + ['-c', 'core.hooksPath=/dev/null', '-c', 'user.email=loop@tangle', '-c', 'user.name=loop', ...args], + cwd, + ) + +function initWorkspace(): void { + rmSync(bareRepo, { recursive: true, force: true }) + git(['init', '--bare', '-b', 'main', bareRepo]) + const seed = mkdtempSync(join(tmpdir(), 'osw-seed-')) + try { + git(['clone', bareRepo, seed]) + writeFileSync( + join(seed, 'test_answer.py'), + 'from answer import answer\n\nassert answer() == 42, answer()\nprint("INTEGRATION OK")\n', + ) + git(['add', '-A'], seed) + git(['commit', '-m', 'seed integration test'], seed) + git(['push', 'origin', 'main'], seed) + } finally { + rmSync(seed, { recursive: true, force: true }) + } +} + +function integrationCheck(): string { + const check = mkdtempSync(join(tmpdir(), 'osw-check-')) + try { + git(['clone', bareRepo, check]) + return sh('python3', ['test_answer.py'], check).trim() + } finally { + rmSync(check, { recursive: true, force: true }) + } +} + +class WorkspaceWorkerExecutor implements Executor { + readonly runtime = 'workspace-proof' + private artifact?: ExecutorResult + private inbox: unknown[] = [] + private waiter?: (msg: unknown) => void + + constructor( + private readonly phase: WorkerPhase, + private readonly workspace: Workspace, + ) {} + + deliver(msg: unknown): void { + if (this.waiter) { + const w = this.waiter + this.waiter = undefined + w(msg) + return + } + this.inbox.push(msg) + } + + async execute(task: unknown, signal: AbortSignal): Promise> { + const started = Date.now() + const out = + this.phase === 'initial' + ? await this.writeBadAttempt() + : await this.writeCorrectedAttempt(await this.waitForSteer(signal), task) + this.artifact = { + outRef: contentAddress(out), + out, + spent: zeroSpend(Date.now() - started), + } + return this.artifact + } + + async teardown(): Promise<{ destroyed: boolean }> { + return { destroyed: true } + } + + resultArtifact(): ExecutorResult { + if (!this.artifact) throw new Error('workspace worker read before execute') + return this.artifact + } + + private async waitForSteer(signal: AbortSignal): Promise { + const existing = this.inbox.shift() + if (existing) return steerText(existing) + if (signal.aborted) throw new Error('fix worker aborted before steer') + return new Promise((resolve, reject) => { + const onAbort = () => { + this.waiter = undefined + reject(new Error('fix worker aborted before steer')) + } + signal.addEventListener('abort', onAbort, { once: true }) + this.waiter = (msg) => { + signal.removeEventListener('abort', onAbort) + resolve(steerText(msg)) + } + }) + } + + private async writeBadAttempt(): Promise { + const dir = mkdtempSync(join(tmpdir(), 'osw-worker-')) + try { + await this.workspace.materialize(dir) + writeFileSync(join(dir, 'answer.py'), 'def answer():\n return 0\n') + const test = runTest(dir) + const committed = await this.workspace.commit(dir, 'initial bad answer') + if (!committed.ok) throw new Error(`workspace conflict: ${committed.conflict}`) + return { + kind: 'worker', + phase: 'initial', + ok: false, + output: test.output, + rev: committed.rev, + trace: [ + { type: 'status', data: { status: 'wrote answer.py returning 0' } }, + { type: 'error', data: { message: test.output } }, + ], + } + } finally { + rmSync(dir, { recursive: true, force: true }) + } + } + + private async writeCorrectedAttempt(steer: string, task: unknown): Promise { + const dir = mkdtempSync(join(tmpdir(), 'osw-worker-')) + try { + await this.workspace.materialize(dir) + writeFileSync(join(dir, 'answer.py'), 'def answer():\n return 42\n') + const test = runTest(dir) + const committed = await this.workspace.commit(dir, 'apply observer steer') + if (!committed.ok) throw new Error(`workspace conflict: ${committed.conflict}`) + return { + kind: 'worker', + phase: 'fix', + ok: test.ok, + output: `${taskSummary(task)}\n${test.output}`, + rev: committed.rev, + steer, + trace: [ + { type: 'status', data: { status: 'received observer steer' } }, + { type: 'status', data: { status: 'ran python3 test_answer.py' } }, + ], + } + } finally { + rmSync(dir, { recursive: true, force: true }) + } + } +} + +function runTest(cwd: string): { ok: boolean; output: string } { + try { + return { ok: true, output: sh('python3', ['test_answer.py'], cwd).trim() } + } catch (err) { + const e = err as { stdout?: Buffer | string; stderr?: Buffer | string; message?: string } + return { ok: false, output: `${e.stdout ?? ''}${e.stderr ?? ''}${e.message ?? ''}`.trim() } + } +} + +function steerText(msg: unknown): string { + const steer = (msg as { steer?: unknown }).steer + if (typeof steer !== 'string' || steer.length === 0) throw new Error('missing steer text') + return steer +} + +function taskSummary(task: unknown): string { + return typeof task === 'string' ? task : JSON.stringify(task) +} + +function workerOutput(v: unknown): WorkerOutput { + if (!v || typeof v !== 'object' || (v as { kind?: unknown }).kind !== 'worker') { + throw new Error('expected worker output') + } + return v as WorkerOutput +} + +function makeWorkerAgent(workspace: Workspace, profile: unknown): Agent { + const phase = (profile as { phase?: unknown }).phase + if (phase !== 'initial' && phase !== 'fix') throw new Error('profile.phase must be initial or fix') + const executor = new WorkspaceWorkerExecutor(phase, workspace) + const spec: AgentSpec = { + profile: { name: `workspace-${phase}` } as AgentProfile, + harness: null, + executor, + } + return { + name: `workspace-${phase}`, + act: async () => { + throw new Error('workspace worker is executed through Executor') + }, + executorSpec: spec, + } as Agent & { executorSpec: AgentSpec } +} + +function mockObserverChat() { + return createChatClient({ + transport: 'mock', + defaultModel: 'mock-observer', + handler: async () => ({ + content: JSON.stringify({ + findings: [ + { + area: 'verification', + severity: 'high', + claim: 'Trace shows the integration test failed after answer.py returned the wrong value.', + recommended_action: + 'Open answer.py, make answer() return 42, then run python3 test_answer.py before committing.', + audience: 'agent', + confidence: 0.99, + }, + ], + }), + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + costUsd: null, + model: 'mock-observer', + durationMs: 0, + raw: {}, + }), + }) +} + +function tool(tools: ReturnType, name: string) { + const t = tools.tools.find((x) => x.name === name) + if (!t) throw new Error(`missing coordination tool ${name}`) + return t +} + +async function call( + tools: ReturnType, + name: string, + raw: unknown, +): Promise { + return (await tool(tools, name).handler(raw)) as T +} + +async function main(): Promise { + initWorkspace() + const workspace = gitWorkspace({ ref: bareRepo }) + const blobs: ResultBlobStore = new InMemoryResultBlobStore() + const observerChat = mockObserverChat() + const driver: Agent = { + name: 'driver', + async act(task, scope) { + const tools = createCoordinationTools({ + scope: scope as Scope, + blobs, + perWorker: { maxIterations: 2, maxTokens: 10_000 }, + makeWorkerAgent: (profile) => makeWorkerAgent(workspace, profile), + analysts: { + kinds: [{ id: 'observe', description: 'trace observer', area: 'verification' }], + run: async (kind, artifact) => { + if (kind !== 'observe') throw new Error(`unknown analyst ${kind}`) + const out = workerOutput(artifact) + const observation = await observe( + { + task, + output: out.output, + trace: out.trace, + outcome: out.ok ? 'passed' : 'failed', + runId: out.rev, + }, + { chat: observerChat, maxTraceLines: 20 }, + ) + return observation.findings + }, + }, + }) + + const initial = await call<{ workerId: string }>(tools, 'spawn_worker', { + profile: { phase: 'initial' }, + task: 'write answer.py', + label: 'initial-worker', + }) + const first = await call<{ settled: string; outRef: string }>(tools, 'await_next', {}) + const analyzed = await call<{ findings: AnalystFinding[] }>(tools, 'run_analyst', { + kind: 'observe', + workerId: first.settled, + }) + const finding = analyzed.findings[0] + if (!finding?.recommended_action) throw new Error('observer produced no steer') + + const fix = await call<{ workerId: string }>(tools, 'spawn_worker', { + profile: { phase: 'fix' }, + task: 'apply observer finding and verify', + label: 'fix-worker', + }) + const steer = await call<{ delivered: boolean }>(tools, 'steer_worker', { + workerId: fix.workerId, + instruction: finding.recommended_action, + }) + if (!steer.delivered) throw new Error('observer steer was not delivered') + + const second = await call<{ settled: string; outRef: string }>(tools, 'await_next', {}) + const fixed = workerOutput(await blobs.get(second.outRef)) + if (!fixed.ok || !fixed.rev) throw new Error(`fix worker failed: ${fixed.output}`) + await call<{ stopped: true }>(tools, 'stop', { reason: 'observer steer applied and verified' }) + + return { + kind: 'driver', + ok: true, + initialWorker: initial.workerId, + fixWorker: fix.workerId, + finding: finding.recommended_action, + finalRev: fixed.rev, + integration: integrationCheck(), + } + }, + } + + const result = await createSupervisor().run(driver, 'make answer() return 42', { + runId: 'observe-steer-workspace', + budget: { maxIterations: 10, maxTokens: 100_000 }, + journal: new InMemorySpawnJournal(), + blobs, + executors: createExecutorRegistry(), + }) + + if (result.kind !== 'winner' || result.out.kind !== 'driver' || !result.out.ok) { + throw new Error(`loop failed: ${JSON.stringify(result)}`) + } + + console.log( + JSON.stringify( + { + ok: true, + initialWorker: result.out.initialWorker, + fixWorker: result.out.fixWorker, + finalRev: result.out.finalRev, + integration: result.out.integration, + nodes: result.tree.nodes.length, + }, + null, + 2, + ), + ) +} + +main().catch((err) => { + console.error(err instanceof Error ? err.stack : err) + process.exit(1) +}) diff --git a/bench/src/workspace-loop.mts b/bench/src/workspace-loop.mts new file mode 100644 index 0000000..2724734 --- /dev/null +++ b/bench/src/workspace-loop.mts @@ -0,0 +1,133 @@ +/** + * The decisive de-risk: does a git-backed shared workspace make accumulating, + * resumable, multi-worker loops REAL on a share-nothing substrate? + * + * The red-team's fatal finding: each cloud worker is a fresh box with no shared + * filesystem, so "worker N+1 builds on worker N's code" is impossible and resume + * restores decisions, not the mutated workspace. The proposed fix: a git-backed + * contract. This proves or kills it on 3 dependency-ordered modules (a←b←c). + * + * - The DURABLE WORKSPACE = a bare git repo (models a GitHub branch in cloud). + * - Each WORKER = a FRESH temp clone (models a fresh box's empty FS), torn down + * after it commits+pushes. State survives ONLY because git persisted it. + * - THE PROOF: a worker asserts its dependency's file is on disk in its fresh + * clone. Fails on share-nothing (names-in-a-string); passes on the git seam. + * + * Test (a) — does it link? full run → integration test imports a←b←c, must pass. + * Test (b) — resume? KILL_AFTER=b, then RESUME=1: c clones a repo that + * HAS a+b committed (durable), re-runs ONLY c, links. + * + * pnpm exec tsx src/workspace-loop.mts # (a) happy path + * KILL_AFTER=b pnpm exec tsx src/workspace-loop.mts # die after b + * RESUME=1 pnpm exec tsx src/workspace-loop.mts # (b) resume → finish c + */ +import { execFileSync } from 'node:child_process' +import { existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs' +import { appendFileSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +const BARE = '/tmp/workspace-loop.git' // the durable shared workspace (≈ a remote branch) +const JOURNAL = '/tmp/workspace-loop.journal' // the decision record (which modules are committed) + +const sh = (cmd: string, args: string[], cwd?: string): string => + execFileSync(cmd, args, { cwd, stdio: 'pipe', encoding: 'utf-8' }) +const git = (args: string[], cwd?: string): string => + // hooksPath=/dev/null: these are throwaway test clones, not project repos — the + // operator's global ai-agent-hooks must not run on them. + sh('git', ['-c', 'core.hooksPath=/dev/null', '-c', 'user.email=loop@tangle', '-c', 'user.name=loop', ...args], cwd) + +/** a←b←c: c.top(1) = ((1+1)*2)+3 = 7. The integration test imports the whole chain, + * so it passes ONLY if all three files accumulated in the workspace. */ +const modules = [ + { id: 'a', file: 'a.py', dep: null as string | null, code: 'def base(x):\n return x + 1\n' }, + { id: 'b', file: 'b.py', dep: 'a', code: 'from a import base\n\ndef mid(x):\n return base(x) * 2\n' }, + { id: 'c', file: 'c.py', dep: 'b', code: 'from b import mid\n\ndef top(x):\n return mid(x) + 3\n' }, +] +const depFile = (id: string): string => modules.find((m) => m.id === id)!.file + +function initWorkspace(): void { + rmSync(BARE, { recursive: true, force: true }) + rmSync(JOURNAL, { force: true }) + git(['init', '--bare', '-b', 'main', BARE]) + // seed: the integration test, on the branch from commit 0. + const seed = mkdtempSync(join(tmpdir(), 'wl-seed-')) + git(['clone', BARE, seed]) + writeFileSync(join(seed, 'test_all.py'), 'from c import top\n\nassert top(1) == 7, top(1)\nprint("INTEGRATION OK")\n') + git(['add', '-A'], seed) + git(['commit', '-m', 'seed: integration test'], seed) + git(['push', 'origin', 'main'], seed) + rmSync(seed, { recursive: true, force: true }) +} + +function loadJournal(): Set { + if (!existsSync(JOURNAL)) return new Set() + return new Set(readFileSync(JOURNAL, 'utf-8').split('\n').filter(Boolean)) +} + +/** One worker: fresh clone (empty box FS) → verify dep is ON DISK → port → push → torn down. */ +function runWorker(m: (typeof modules)[number]): void { + const work = mkdtempSync(join(tmpdir(), `wl-${m.id}-`)) + try { + git(['clone', BARE, work]) // a brand-new, otherwise-empty box + // THE PROOF: did the durable workspace carry my dependency's code to this fresh box? + if (m.dep) { + const present = existsSync(join(work, depFile(m.dep))) + console.error(` [${m.id}] fresh clone — dependency ${depFile(m.dep)} on disk? ${present ? 'YES ✓ (git carried it)' : 'NO ✗ (share-nothing)'}`) + if (!present) throw new Error(`SHARE-NOTHING: ${m.id} cannot see ${m.dep} — the seam is broken`) + } else { + console.error(` [${m.id}] fresh clone — root module, no dependency`) + } + writeFileSync(join(work, m.file), m.code) // port the module + git(['add', '-A'], work) + git(['commit', '-m', `port ${m.id}`], work) + git(['push', 'origin', 'main'], work) // accumulate into the durable workspace + appendFileSync(JOURNAL, `${m.id}\n`) // record the decision durably + console.error(` [${m.id}] ported + committed + pushed → torn down`) + } finally { + rmSync(work, { recursive: true, force: true }) // teardown — the box dies, git survives + } +} + +/** Test (a): a fresh clone of the durable workspace must pass the integration test. */ +function integrationLinks(): boolean { + const check = mkdtempSync(join(tmpdir(), 'wl-check-')) + try { + git(['clone', BARE, check]) + const out = sh('python3', ['test_all.py'], check) + console.error(` integration: ${out.trim()}`) + return out.includes('INTEGRATION OK') + } catch (e) { + console.error(` integration FAILED: ${e instanceof Error ? e.message.split('\n').slice(-3).join(' ') : e}`) + return false + } finally { + rmSync(check, { recursive: true, force: true }) + } +} + +function main(): void { + const resume = process.env.RESUME === '1' + const killAfter = process.env.KILL_AFTER + if (!resume) initWorkspace() + const done = loadJournal() + console.error(`\n=== git-backed workspace loop · ${resume ? 'RESUME' : 'fresh'}${done.size ? ` (done: ${[...done].join(',')})` : ''} ===`) + + for (const m of modules) { + if (done.has(m.id)) { + console.error(` [${m.id}] skip — already committed (resumed from durable git)`) + continue + } + if (m.dep && !loadJournal().has(m.dep)) throw new Error(`${m.id} blocked: dependency ${m.dep} not done (topo order)`) + runWorker(m) + if (killAfter === m.id) { + console.error(`\n💥 KILLED after ${m.id} (orchestrator death). Durable git has: ${[...loadJournal()].join(',')}. Re-run with RESUME=1.\n`) + process.exit(1) + } + } + + const links = integrationLinks() + console.error(`\n=== ${links ? '✅ LINKS — git-backed workspace + resume is REAL' : '❌ does not link'} ===`) + process.exit(links ? 0 : 1) +} + +main() diff --git a/docs/ANTI_PATTERNS.md b/docs/ANTI_PATTERNS.md new file mode 100644 index 0000000..81e2359 --- /dev/null +++ b/docs/ANTI_PATTERNS.md @@ -0,0 +1,81 @@ +> **Track:** Reference | **Role:** process guardrail | **Status:** canonical + +# Anti-Patterns + +These are repo-level failure modes that have already cost time or produced +misleading confidence. If a proposal repeats one, stop and ask what proof would +make the work legitimate. + +## Mechanism Ahead Of The Gate + +Do not build per-branch adaptive sub-agents, learned planners, corpus promotion, +outer-flywheel machinery, or other high-ceiling mechanisms before a positive +gate result. Expressiveness was the closed gap; evidence is the open one. + +Required proof: a measured non-blind topology beats blind compute at equal k, +under a deployable selector, on a domain with a correctable middle band, with +reported discordant pairs and multiple-comparison discipline. + +## Facade Before Substrate Join + +A developer-friendly loop/protocol/API is not justified until a tiny executable +proof shows the real path it claims to simplify: + +```txt +existing substrate primitive -> real worker -> real trace/state -> verifier/observer -> corrective action +``` + +If most of the API retypes `Scope`, MCP tools, journals, validators, or git, +delete it and document the missing join instead. + +Current local proof: + +```bash +pnpm exec tsx bench/src/observe-steer-workspace-loop.mts +``` + +Remaining external proof: the same shape with `openSandboxRun` workers and a +remote branch a sandbox can clone and push. + +## Relocated Protocol Masquerading As Simplification + +Deleting a facade is not enough if the same grammar reappears one layer lower. +Question, analyst, message, packet, trace, or coordination surfaces need the +same proof burden wherever they live: an executable run over live +`Scope`/MCP/journal/workspace paths, not mocks proving the grammar can talk to +itself. + +## Re-Running Settled Measurements + +Do not re-open a settled experiment because it is emotionally attractive. Read +`.evolve/current.json`, `memory/`, and the dated controlled-result notes before +launching a new run. A new experiment must name the changed axis and why the old +result no longer answers it. + +## Confounded Causal Claims + +Never claim a topology, prompt, planner, or steering strategy helped when the +treatment received more compute or better infrastructure than control. + +Minimum report: + +- same tasks +- same budget/equal k +- infra errors excluded and counted separately +- discordant pairs reported +- deterministic or execution-grounded oracle preferred +- threats to validity stated in the artifact + +## Silent Success + +Do not fake completion by returning defaults, empty arrays, best-effort outputs, +or swallowed errors. External-boundary calls return typed outcomes; inspect +`succeeded` before `value`. A verifier, package check, deployment, or benchmark +worked only after the artifact itself was checked. + +## Overclaim + +"Validates the concept" is not "validates the product." Route through the real +kernel before claiming product proof. Underpowered directional splits are not +wins. Mocked analyst/model seams are acceptable for local plumbing tests, but +the report must say what remains unproven. diff --git a/docs/BUILDING.md b/docs/BUILDING.md new file mode 100644 index 0000000..8334569 --- /dev/null +++ b/docs/BUILDING.md @@ -0,0 +1,66 @@ +> **Track:** Reference | **Role:** building discipline | **Status:** canonical + +# Building Guidelines + +This repo is a measurement substrate as much as a runtime. The build style is: +prove the smallest real path, extend the existing primitive, and report what is +measured versus inferred. + +## Default Loop + +For every substantive step, state: + +- what is changing +- why it matters to the north star +- why the obvious alternatives are worse +- how the claim was verified + +## Rules + +1. **Goal before execution.** Name the consumer, the decision this work changes, +and the axis that must discriminate before optimizing anything. +2. **Ground truth over inference.** Claims about behavior come from running the + thing or reading the source. If a probe contradicts the model, the probe wins. +3. **Check existing before building.** Search for the primitive first. Extend it + instead of forking it unless the new module can name the boundary it owns. +4. **Cheapest decisive check first.** Run the experiment that could invalidate + the plan before the broad implementation or benchmark matrix. +5. **Verify before claiming.** Typecheck/lint/tests before "built"; live probe + before "works"; artifact opened before "shipped." +6. **Estimate cost before launch.** State cells x per-cell-time / concurrency + before any fleet, benchmark, or optimizer run. +7. **Separate roles.** Verifier accepts/rejects, judge scores held-out quality, + analyst diagnoses traces, driver chooses action. Do not let judge output steer + the current run. +8. **Keep durable state durable.** Commits, journals, result blobs, trace events, + and corpus facts are the record. Do not rely on a string summary where a + replayable artifact is available. +9. **Write down durable knowledge in the same turn.** Update the code map, + evidence ledger, or process doc when the work changes how future agents + should operate. + +## Loop API Discipline + +The blessed loop surface is the substrate: + +- fixed shapes: `fanout`, `pipeline`, `loopUntil`, `panel` +- sandbox loops: `runLoop` +- dynamic recursive trees: `Scope` + Supervisor +- sandbox driver binding: `createCoordinationTools` +- durable workspace: `gitWorkspace` over a `Shell` +- trace feedback: `observe` + +Add a facade only after a tiny executable proof shows the substrate join and the +remaining boilerplate is irreducible. + +## Documentation Placement + +- `CLAUDE.md` / `AGENTS.md`: bootloader and repo-local deltas only. +- `docs/BUILDING.md`: stable build rules. +- `docs/ANTI_PATTERNS.md`: named failure modes and stop signs. +- `docs/research/*`: evidence, postmortems, open designs, and dated decisions. +- `.evolve/current.json` and `memory/`: live state and measured results. + +If a rule is timeless and applies across agents, put it here and link to it from +the bootloader. If it is an experiment result, put it in the evidence ledger, not +the bootloader. diff --git a/docs/README.md b/docs/README.md index 4d984c9..d6e2899 100644 --- a/docs/README.md +++ b/docs/README.md @@ -42,6 +42,8 @@ The package API and subsystems. | [agent-bus-protocol.md](./agent-bus-protocol.md) | normative protocol *(needs-update)* | The multi-agent call bus — depth limits, headers, refusal contract. (Pending: 429/413 fix + subpath list.) | | [conversation-economics.md](./conversation-economics.md) | subsystem | Conversation cost accounting and auth-source model (`src/conversation/`). | | [durability-adapters.md](./durability-adapters.md) | subsystem | Journal + durability adapters for resumable conversations. | +| [BUILDING.md](./BUILDING.md) | process | Canonical building discipline: goal first, cheapest decisive proof, substrate-first loop API, verification rules. | +| [ANTI_PATTERNS.md](./ANTI_PATTERNS.md) | process | Named failure modes: mechanism ahead of gate, facade before substrate proof, relocated protocol, confounded claims, overclaim. | | [refactor-roadmap.md](./refactor-roadmap.md) | package hygiene *(needs prune)* | Package-structure cleanup items (R1–R10); closed items deleted per its own rule. | ## Conventions @@ -50,4 +52,4 @@ The package API and subsystems. - Architecture docs cross-link the spine; the spine links its deep-dives and the empirical harness. - "Built vs Designed" is stated explicitly in `architecture.md` and `architecture-interpretations.md` — never assume a documented design is shipped without the `file:line` anchor. -- Repo-wide authorship + comment-discipline + layering rules live in [../CLAUDE.md](../CLAUDE.md). +- Repo bootloader + authorship/comment/layering deltas live in [../CLAUDE.md](../CLAUDE.md); durable process rules live in [BUILDING.md](./BUILDING.md) and [ANTI_PATTERNS.md](./ANTI_PATTERNS.md). diff --git a/docs/research/README.md b/docs/research/README.md index 06c6ebc..0832aae 100644 --- a/docs/research/README.md +++ b/docs/research/README.md @@ -25,6 +25,7 @@ spine happen explicitly, with `file:line` anchors, once a design ships. | [belief-agent-research-agenda.md](./belief-agent-research-agenda.md) | Research agenda for the recursive/belief-state agent — 7 disciplinary lenses → ranked agenda, grounded against the gate result (judge-blind selection loses; the win needs a deployable checker). Top tier is **offline on committed corpora**; the learner tier is gated. | | [program-research-plan.md](./program-research-plan.md) | Formal fund-or-kill audit of the program-synthesis framing. The honest verdict: **kill the RSI frame, park orchestration, ship the instrument + abstention.** | | [codex-techniques-audit.md](./codex-techniques-audit.md) | Adoption report mining OpenAI Codex for succinct-code principles + orchestration techniques. **Advisory** — verify `file:line` before acting. | +| [loop-facade-postmortem.md](./loop-facade-postmortem.md) | Failure record for the deleted `defineLoop` facade: why retyping `Scope`/MCP/journals/validators produced code without substrate proof, and the prevention rule for future loop APIs. | ## Source artifacts (multi-agent passes) diff --git a/docs/research/loop-facade-postmortem.md b/docs/research/loop-facade-postmortem.md new file mode 100644 index 0000000..578f429 --- /dev/null +++ b/docs/research/loop-facade-postmortem.md @@ -0,0 +1,85 @@ +> **Track:** Architecture (research) | **Role:** failure record | **Status:** active guardrail + +# Failure 20260608-041142: Loop Facade Ahead Of The Substrate + +Date: 2026-06-08 +Severity: High - public API drift risk +Detection: four blind audits plus branch diff review on PR #194 + +## What Happened + +The branch added a `defineLoop` authoring surface with its own artifacts, +messages, questions, trace slices, analysts, verifier, judge, control plane, docs, +example, and tests. The code mostly retyped existing substrate concepts: + +- `Scope.spawn`, `Scope.next`, and `Scope.send` +- MCP coordination tools +- spawn journal and topology views +- `Validator` / `DefaultVerdict` +- runtime hooks and trace export +- git-backed workspace state + +The facade did not prove the hard join: real substrate worker -> trace/state -> +observer finding -> corrective steer. + +## Root Cause + +The design optimized for a clean authoring grammar before proving that the +runtime join needed a new grammar. That inverted the correct order: + +1. Prove the smallest real loop on the substrate. +2. Identify the irreducible repeated boilerplate. +3. Add only the facade that removes that boilerplate. + +The facade happened before step 1, so it grew protocol code that could be tested +against fakes while remaining weakly connected to the real execution substrate. + +## Fix + +Commit `ab2823d` deleted the facade exports, docs, example, and tests: + +- `src/runtime/define-loop.ts` +- `src/runtime/loop-types.ts` +- `src/runtime/loop-trace.ts` +- `docs/loop-authoring.md` +- `examples/define-loop/` +- `tests/loops/define-loop.test.ts` + +The remaining loop story is substrate-first: + +- fixed shapes: `fanout`, `pipeline`, `loopUntil`, `panel` +- sandbox loops: `runLoop` +- recursive dynamic trees: `Scope` + Supervisor +- sandbox driver binding: `createCoordinationTools` +- durable workspace: `gitWorkspace` over a `Shell` +- trace feedback: `observe` + +This branch now contains the smallest local proof of the missing join: + +```bash +pnpm exec tsx bench/src/observe-steer-workspace-loop.mts +``` + +That script drives a real Supervisor/Scope through the coordination MCP verbs: +first worker commits a failing artifact to a git workspace, `run_analyst` calls +`observe()` on the settled trace/output, `steer_worker` delivers the finding via +`Scope.send`, a correction worker commits the fix, and a fresh clone passes the +integration test. + +It is not the cloud proof. The remaining external proof is the same shape with +`openSandboxRun` workers and a remote branch that a sandbox can clone and push. + +## Prevention Rule + +No new loop facade lands until a tiny executable proof shows the exact substrate +join the facade claims to simplify. The local proof for this thread is: + +```txt +Scope.spawn -> coordination MCP -> gitWorkspace -> observe() finding -> Scope.send steer +``` + +The cloud proof still must add `openSandboxRun worker -> remote git branch`. + +A proposed API fails review if it primarily renames existing substrate concepts +or needs fake agents to demonstrate its value. The accepted API is the smallest +wrapper over proven joins, not the nicest grammar imagined ahead of them. diff --git a/skills/loop-writer/SKILL.md b/skills/loop-writer/SKILL.md new file mode 100644 index 0000000..322cef9 --- /dev/null +++ b/skills/loop-writer/SKILL.md @@ -0,0 +1,163 @@ +--- +name: loop-writer +description: Author clean recursive agent loops on @tangle-network/agent-runtime. Use for Scope/supervisor orchestration, runLoop, Pi/sandbox drivers, fanout, trace analysts, verifiers/judges, question escalation, live messages, and self-improving loop recipes. +--- + +# loop-writer + +Design the smallest loop that can honestly solve the objective. The blessed +surface is the substrate: `fanout`/`pipeline` for fixed shapes, `runLoop` for +round-synchronous sandbox loops, and `Scope`/Supervisor for recursive +driver/worker trees. Do not create a second loop grammar. + +## Mental Model + +```txt +user -> Pi/root driver -> supervisor -> sandbox driver -> worker -> leaf harness +``` + +Each level may spawn below, wait below, analyze below, steer below, and escalate +questions upward. The substrate owns budget, trace, abort, journal, and replay. +The driver owns strategy. + +## Pick The Primitive + +| Objective | Use | +|---|---| +| Try N attempts, pick best | `fanout` or `createFanoutVoteDriver` | +| Ordered stages | `pipeline` | +| Improve until executable check passes | `loopUntil` + verifier | +| Review from several lenses | `panel` | +| Simulated user/product eval | `defineConversation` + `runConversation` | +| Dynamic topology / drivers of drivers | `Scope` or sandbox driver + `createCoordinationTools` | +| Mutate a shared repo | git branch/clone loop with typed merge outcomes | + +If a fixed combinator solves it, do not use a dynamic driver. + +## Minimal Sandbox Loop + +```ts +const trace: unknown[] = [] +const result = await runLoop({ + driver: createDriver({ planner, maxIterations: 4 }), + agentRun: agentRunSpec, + output, + validator: executableGate, + task, + ctx: { + sandboxClient, + traceEmitter: { emit: async (event) => trace.push(event) }, + }, +}) + +const observation = await observe( + { + task: String(task), + output: JSON.stringify(result.winner?.output ?? result.decision), + trace, + outcome: result.winner ? 'passed' : 'failed', + runId, + }, + { chat, model, corpus }, +) +``` + +## Minimal Recursive Driver + +```ts +const driver: Agent = { + name: 'secure-build-driver', + async act(task, scope) { + const spawned = scope.spawn(workerAgent, task, { budget: perWorker, label: 'worker-a' }) + if (!spawned.ok) throw new Error(spawned.reason) + + const settled = await scope.next() + const observation = await observe( + { + task: String(task), + output: JSON.stringify(settled), + trace: [settled, scope.view], + outcome: settled?.kind === 'done' ? 'passed' : 'failed', + runId, + }, + { chat, model, corpus }, + ) + + const steer = observation.findings[0]?.recommended_action + if (!steer) return synthesize(settled, observation) + + const correction = scope.spawn(workerAgent, { task, prior: settled }, { + budget: perWorker, + label: 'worker-corrected', + }) + if (!correction.ok) throw new Error(correction.reason) + if (!scope.send(correction.handle.id, { steer })) throw new Error('steer delivery failed') + + const fixed = await scope.next() + return synthesize(fixed, observation) + }, +} + +const result = await createSupervisor().run(driver, task, supervisorOpts) +``` + +When the driver lives in a sandbox, expose the same verbs through +`createCoordinationTools`: `spawn_worker`, `await_next`, `observe_worker`, +`steer_worker`, `list_questions`, `answer_question`, `ask_parent`, `stop`, and +optional analyst tools. + +## Role Boundaries + +- **Verifier**: executable shippability gate; controls accept/reject. +- **Judge**: held-out score only; never steers the current run. +- **Analyst**: trace-derived diagnosis over worker, pairwise, subtree, or full + loop traces; may emit findings, questions, messages, or blockers. +- **Driver/reviewer**: consumes evidence and chooses continue, steer, spawn, + answer, escalate, or stop. + +## Questions And Steering + +Questions are blockers, not prose hidden in output. A child asks its parent; the +parent answers when it has evidence, defers when safe, or escalates to Pi/user +when answering would invent requirements. `failClosed` loops must not stop clean +with unresolved `blocks-run` questions. + +Steer sparingly: only when an analyst finds a concrete mistake, a loop is +duplicating work, a parent/Pi answers a blocker, or a verifier reveals a specific +fix a running worker can still use. Delivery is through `Scope.send` or +`steer_worker`; failed delivery means spawn a fresh corrected attempt. + +## Workspace Loops + +Git is the durable workspace seam: + +- one branch/clone per worker +- `gitWorkspace({ ref })` when host and sandbox need the same clone/commit/push contract +- explicit commit per worker +- typed merge result: `merged | conflict | stale-base | rejected | verifier-failed` +- resume derives completion from git state, not only a side journal +- conflicts become blockers/questions, not silent overwrite + +Proof command for the local substrate join: + +```bash +pnpm exec tsx bench/src/observe-steer-workspace-loop.mts +``` + +It proves `Scope.spawn -> coordination tools -> gitWorkspace -> observe -> +Scope.send -> corrective worker -> integration pass`. Until the same proof runs +with `openSandboxRun` and a remote branch, claim local substrate closure and +serial git accumulation, not full cloud migration safety. + +## Final Check + +- Does every meaningful product land in result blobs, journals, commits, + conversation journals, or trace events? +- Are verifier, judge, analyst, and driver roles separated? +- Can blocking questions move up the chain? +- Can Pi/parent steer without bypassing verification? +- Is workspace mutation transactional if workers edit shared code? +- Can existing trace/journal views isolate agents, pairs, subtrees, and the full + run? +- Is the loop small enough that an agent can author it without inventing hidden + runtime behavior? diff --git a/src/mcp/delegates.ts b/src/mcp/delegates.ts index 2d020b4..cc0e348 100644 --- a/src/mcp/delegates.ts +++ b/src/mcp/delegates.ts @@ -113,8 +113,14 @@ export interface CreateDefaultCoderDelegateOptions { * `executor: createSiblingSandboxExecutor({ client: sandboxClient })`. */ sandboxClient?: SandboxClient + /** Backend harness for the single-coder path. Default comes from `coderProfile`. */ + harness?: string + /** Model override for the single-coder path. */ + model?: string /** Default `['claude-code', 'codex', 'opencode/zai-coding-plan/glm-5.1']` when variants > 1. */ fanoutHarnesses?: string[] + /** Optional per-harness model override for `variants > 1`. */ + fanoutModels?: (string | undefined)[] /** Hard cap on the kernel's per-batch concurrency. Default 4. */ maxConcurrency?: number /** @@ -163,7 +169,11 @@ export function createDefaultCoderDelegate( const variants = Math.max(1, Math.trunc(args.variants ?? 1)) ctx.report({ iteration: 0, phase: 'starting' }) if (variants <= 1) { - const { agentRunSpec, output, validator } = coderProfile({ task }) + const { agentRunSpec, output, validator } = coderProfile({ + task, + ...(options.harness ? { harness: options.harness } : {}), + ...(options.model ? { model: options.model } : {}), + }) const result = await runLoop({ driver: singleShotDriver, agentRun: agentRunSpec, @@ -185,11 +195,12 @@ export function createDefaultCoderDelegate( ctx.report({ iteration: 1, phase: 'completed' }) return chosen } - const fanout = multiHarnessCoderFanout( - fanoutHarnesses && fanoutHarnesses.length > 0 + const fanout = multiHarnessCoderFanout({ + ...(fanoutHarnesses && fanoutHarnesses.length > 0 ? { harnesses: fanoutHarnesses.slice(0, variants) } - : { harnesses: undefined }, - ) + : {}), + ...(options.fanoutModels ? { models: options.fanoutModels.slice(0, variants) } : {}), + }) const agentRuns = fanout.agentRuns.slice(0, variants) const result = await runLoop({ driver: fanout.driver, diff --git a/src/mcp/index.ts b/src/mcp/index.ts index 6fd3b36..14e58de 100644 --- a/src/mcp/index.ts +++ b/src/mcp/index.ts @@ -86,10 +86,16 @@ export { runCheck, } from './tools/checks' export { + type AnalystRegistry, + type CoordinationEvent, type CoordinationTools, type CoordinationToolsOptions, createCoordinationTools, type MakeWorkerAgent, + type Question, + type QuestionDecision, + type QuestionPolicy, + type QuestionRecord, type SettledWorker, } from './tools/coordination' export { diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 8e89338..ab12cb2 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -82,10 +82,9 @@ export interface McpServerOptions { /** Override the default in-memory task queue. */ queue?: DelegationTaskQueue /** - * Extra tools to serve alongside the delegation tools — e.g. the operator toolbox - * (`createCoordinationTools(...).tools`), which exposes the driver's spawn/observe/steer verbs over - * MCP so a sandbox agent can BE the driver. Registered after the built-ins; a duplicate name - * throws (fail loud — no silent shadowing of a delegation tool). + * Extra tools to serve alongside the delegation tools, for example + * `createCoordinationTools(...).tools`. Registered after the built-ins; a + * duplicate name throws so delegation tools cannot be shadowed silently. */ extraTools?: McpToolDescriptor[] /** Server display name surfaced via `initialize`. Default `'agent-runtime-mcp'`. */ diff --git a/src/mcp/tools/checks.ts b/src/mcp/tools/checks.ts index 5e89b7e..a3195d8 100644 --- a/src/mcp/tools/checks.ts +++ b/src/mcp/tools/checks.ts @@ -263,9 +263,11 @@ function defaultChat(opts: CheckRunnerOptions): (system: string, user: string) = } } -/** Build a `run_analyst` runner over a kind directory — the seam the operator toolbox is wired with. - * Returns the findings, or a typed error for an unknown kind. `producedAt` is passed in (the runtime - * forbids `Date.now` in replay-safe paths; the caller stamps it). */ +/** + * Build a `run_analyst` runner over a kind directory. + * Returns findings, or a typed error for an unknown kind. `producedAt` is + * passed in because replay-safe paths must not read `Date.now`. + */ export function makeCheckRunner( kinds: Record, opts: CheckRunnerOptions, diff --git a/src/mcp/tools/coordination.ts b/src/mcp/tools/coordination.ts index 903cf95..769558e 100644 --- a/src/mcp/tools/coordination.ts +++ b/src/mcp/tools/coordination.ts @@ -1,30 +1,10 @@ /** * @experimental * - * COORDINATION TOOLS — the verbs a parent agent uses to coordinate the child agents it spawns, - * exposed as MCP tools backed by a live keystone `Scope`. This is `Scope`-as-MCP. - * - * NOT a transport. The cross-org message bus (`docs/agent-bus-protocol.md`) and the SDK's - * `dispatchPrompt`/`SessionMessage` are the *transports* the `steer` verb rides; THIS file is the - * verb set (the API). One verb, several bindings: in-process `Scope.send` is a direct call; across - * sandboxes it rides SDK session-messaging; across orgs it rides the agent-bus protocol. - * - * spawn_worker → scope.spawn (budget-bounded, fail-closed — equal-k holds even for an LLM driver) - * await_next → scope.next (THE wake event: block until the next spawned child settles) - * observe_worker→ scope.view + the result blob (a child's status, spend, and settled output) - * steer_worker → scope.send (deliver a next-instruction / interrupt to a RUNNING child) - * list_analysts → the check menu (the trace lenses the agent can apply — see checks.ts) - * run_analyst → apply a CHECK (run a kind over a child's trace → trace-derived findings) - * stop → declare the run complete (the terminal move) - * - * The check verbs are present only when the check seam (`analystKinds` + `runAnalyst`) is wired — - * an agent that does not review traces (a pure dispatcher) omits them. A trace check is a SEPARATE - * lens (selector ≠ judge: it reads the trace, never the score); authoring a NEW check at runtime is - * the next addition. - * - * A worker the driver spawns may itself carry the driver profile — `spawn_worker` does not care what - * the profile is, so drivers-of-drivers fall out for free (each sub-driver gets its own sub-scope, - * bounded by `maxDepth` + the conserved pool). + * MCP binding for a live `Scope`. A sandbox driver gets the same small verbs + * the in-process driver has: spawn, observe, await, steer, ask/answer, analyze, + * and stop. Settled outputs remain Scope artifacts; product code can project + * them into any UI/report envelope it needs. */ import type { @@ -36,64 +16,106 @@ import type { } from '../../runtime' import type { McpToolDescriptor } from '../server' -/** A worker the driver has drained via `await_next` — the operator's running ledger of settled - * workers + their DEPLOYABLE verdict (the driver IS the selector, so it legitimately reads the - * verdict; the analyst, which reads only the trace, is the separate selector≠judge lens). The - * driver picks its deliverable from this ledger at `stop`. */ +/** A worker the driver has drained via `await_next`. */ export interface SettledWorker { readonly id: string readonly status: 'done' | 'down' - /** Deployable score in [0,1] from the worker's verdict (done only). */ readonly score?: number - /** Whether the deployable verdict passed (done only). */ readonly valid?: boolean - /** Result-blob pointer for the worker's output/trace (done only). */ readonly outRef?: string - /** Failure reason (down only). */ readonly reason?: string } -/** How a `spawn_worker` profile becomes a spawnable leaf `Agent`. The caller wires this (e.g. the - * surface registry turns a profile into a shot executor) so the toolbox stays domain-blind. */ +export type QuestionLevel = 'worker' | 'driver' | 'loop' +export type QuestionUrgency = 'continue-without' | 'blocks-step' | 'blocks-run' + +export interface QuestionOption { + readonly label: string + readonly tradeoff: string +} + +export interface Question { + readonly id: string + readonly from: string + readonly level: QuestionLevel + readonly question: string + readonly reason: string + readonly urgency: QuestionUrgency + readonly options?: ReadonlyArray +} + +export type QuestionDecision = + | { readonly kind: 'answer'; readonly answer: string; readonly by: string } + | { readonly kind: 'defer'; readonly reason: string } + | { readonly kind: 'escalate'; readonly to: 'parent' | 'user' | string; readonly reason: string } + +export interface QuestionRecord extends Question { + readonly status: 'open' | 'answered' | 'deferred' | 'escalated' + readonly decision?: QuestionDecision + readonly openedAt: number +} + +type QuestionInput = Omit & { readonly id?: string } +export type QuestionPolicy = 'auto' | 'mustDecide' | 'bubble' | 'failClosed' + +export interface AnalystRegistry { + readonly kinds: ReadonlyArray<{ id: string; description: string; area: string }> + readonly run: (kindId: string, trace: unknown) => Promise +} + +export type CoordinationEvent = { readonly type: 'question'; readonly question: QuestionRecord } + export type MakeWorkerAgent = (profile: unknown) => SuperviseAgent export interface CoordinationToolsOptions { - /** The DRIVER's live scope — spawn/observe/steer all act on this. */ readonly scope: Scope - /** Result blobs, so `observe_worker` can rehydrate a settled worker's output. */ readonly blobs: ResultBlobStore - /** Turn a spawn_worker `profile` into a leaf agent (registry-resolved on spawn). */ readonly makeWorkerAgent: MakeWorkerAgent - /** Per-worker conserved budget the driver reserves on each spawn. */ readonly perWorker: Budget - /** The analyst lens menu (for `list_analysts`) — id + one-line + area. Injected so the toolbox - * stays domain-blind; wire it from `analyst-kinds.ts`'s directory. Omit to disable analyst tools. */ - readonly analystKinds?: ReadonlyArray<{ id: string; description: string; area: string }> - /** Run a lens over a worker's trace → findings (or a typed error). Wire it from - * `makeCheckRunner(...)`. `run_analyst` fetches the worker's settled output and passes it here. */ - readonly runAnalyst?: (kindId: string, trace: unknown) => Promise + readonly analysts?: AnalystRegistry + readonly onEvent?: (event: CoordinationEvent) => void | Promise + readonly questionPolicy?: QuestionPolicy } export interface CoordinationTools { - /** MCP tools — register on an `McpServer`, or call the handlers directly in-process. */ readonly tools: McpToolDescriptor[] - /** True once the driver called `stop` — the operator loop reads this to terminate. */ isStopped(): boolean - /** The reason passed to `stop`, if any. */ stopReason(): string | undefined - /** The workers drained so far via `await_next` (the driver's selection ledger). */ settled(): ReadonlyArray + questions(): ReadonlyArray } const idArg = { type: 'string', description: 'The workerId returned by spawn_worker.' } as const -/** Build the operator toolbox over a live scope. The tools are the driver's verbs; their handlers - * are thin wrappers over the keystone (spawn/view/send), so the budget/journal/abort discipline of - * the Supervisor applies to a sandbox driver exactly as to the in-process one. */ +/** Build the driver's MCP tools over a live scope. */ export function createCoordinationTools(opts: CoordinationToolsOptions): CoordinationTools { let stopped = false let reason: string | undefined + let questionSeq = 0 const ledger: SettledWorker[] = [] + const questions: QuestionRecord[] = [] + const questionPolicy = opts.questionPolicy ?? 'auto' + + const str = (v: unknown, field: string): string => { + if (typeof v !== 'string' || v.length === 0) + throw new Error(`coordination tools: "${field}" must be a non-empty string`) + return v + } + const obj = (raw: unknown): Record => { + if (!raw || typeof raw !== 'object') + throw new Error('coordination tools: arguments must be an object') + return raw as Record + } + const level = (v: unknown): Question['level'] => { + if (v === 'worker' || v === 'driver' || v === 'loop') return v + throw new Error('coordination tools: "level" must be worker, driver, or loop') + } + const urgency = (v: unknown): Question['urgency'] => { + if (v === 'continue-without' || v === 'blocks-step' || v === 'blocks-run') return v + throw new Error( + 'coordination tools: "urgency" must be continue-without, blocks-step, or blocks-run', + ) + } const recordSettled = (s: Settled): SettledWorker => { const w: SettledWorker = @@ -110,29 +132,90 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin return w } - const str = (v: unknown, field: string): string => { - if (typeof v !== 'string' || v.length === 0) - throw new Error(`operator toolbox: "${field}" must be a non-empty string`) - return v + const nextQuestionId = (from: string): string => `${from}:q${questionSeq++}` + const normalizeQuestion = (q: QuestionInput, fallbackFrom: string): Question => { + const from = str(q.from ?? fallbackFrom, 'from') + return { + id: typeof q.id === 'string' && q.id.length > 0 ? q.id : nextQuestionId(from), + from, + level: level(q.level), + question: str(q.question, 'question'), + reason: str(q.reason, 'reason'), + ...(q.options ? { options: q.options } : {}), + urgency: urgency(q.urgency), + } } - const obj = (raw: unknown): Record => { - if (!raw || typeof raw !== 'object') - throw new Error('operator toolbox: arguments must be an object') - return raw as Record + const addQuestion = ( + raw: QuestionInput, + fallbackFrom: string, + decision?: QuestionDecision, + ): { question: QuestionRecord; added: boolean } => { + const q = normalizeQuestion(raw, fallbackFrom) + const existing = questions.find((x) => x.id === q.id) + if (existing) return { question: existing, added: false } + const effectiveDecision = + decision ?? + (questionPolicy === 'bubble' + ? ({ + kind: 'escalate', + to: 'parent', + reason: 'question policy bubbled to parent', + } as const) + : undefined) + const status: QuestionRecord['status'] = + effectiveDecision?.kind === 'answer' + ? 'answered' + : effectiveDecision?.kind === 'defer' + ? 'deferred' + : effectiveDecision?.kind === 'escalate' + ? 'escalated' + : 'open' + const record: QuestionRecord = { + ...q, + status, + openedAt: Date.now(), + ...(effectiveDecision ? { decision: effectiveDecision } : {}), + } + questions.push(record) + return { question: record, added: true } + } + const emitNewQuestion = async (record: { + question: QuestionRecord + added: boolean + }): Promise => { + if (record.added) await opts.onEvent?.({ type: 'question', question: record.question }) + return record.question + } + const decideQuestion = (questionId: string, decision: QuestionDecision): QuestionRecord => { + const idx = questions.findIndex((q) => q.id === questionId) + if (idx < 0) throw new Error(`unknown questionId ${JSON.stringify(questionId)}`) + const prior = questions[idx] as QuestionRecord + const status: QuestionRecord['status'] = + decision.kind === 'answer' ? 'answered' : decision.kind === 'defer' ? 'deferred' : 'escalated' + const next: QuestionRecord = { ...prior, status, decision } + questions[idx] = next + return next + } + const blockingQuestionsForStop = (): QuestionRecord[] => { + if (questionPolicy === 'auto' || questionPolicy === 'bubble') return [] + return questions.filter((q) => { + const blocking = q.urgency === 'blocks-step' || q.urgency === 'blocks-run' + if (!blocking) return false + if (questionPolicy === 'mustDecide') return q.status === 'open' + return q.status !== 'answered' && q.status !== 'deferred' + }) } const tools: McpToolDescriptor[] = [ { name: 'spawn_worker', description: - 'Start a worker the operator will drive. `profile` is the worker (or another DRIVER — ' + - 'drivers-of-drivers are allowed); `task` is what it should do. Reserves the worker’s budget ' + - 'from the conserved pool and FAILS CLOSED when the pool is dry — so spawning "at will" is ' + - 'bounded by the budget. Returns { workerId } or { error: "budget-exhausted" | "depth-exceeded" }.', + 'Start a worker the driver will drive. `profile` is the worker or another driver; ' + + '`task` is what it should do. Reserves budget from the conserved pool and fails closed.', inputSchema: { type: 'object', properties: { - profile: { description: 'The worker/driver profile to run (passed to makeWorkerAgent).' }, + profile: { description: 'The worker/driver profile to run.' }, task: { description: 'The task the worker should perform.' }, label: { type: 'string', description: 'Optional trace label.' }, }, @@ -150,11 +233,7 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, { name: 'observe_worker', - description: - 'Inspect a worker you are driving: its live status + conserved spend, and — once it has ' + - 'settled — its output artifact (rehydrated from the result blob). Use this to review work ' + - 'before deciding your next move. (In-flight token-level trace is surfaced via the analyst, ' + - 'not here.)', + description: 'Inspect a worker status, spend, and settled output artifact when available.', inputSchema: { type: 'object', properties: { workerId: idArg }, required: ['workerId'] }, handler: async (raw) => { const id = str(obj(raw).workerId, 'workerId') @@ -171,10 +250,7 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, { name: 'steer_worker', - description: - 'Steer a RUNNING worker out-of-band — deliver your next instruction / a course-correction / ' + - 'an interrupt to its inbox. Returns { delivered } — false if the worker has finished or its ' + - 'harness cannot be steered mid-flight (then spawn a fresh one or wait and re-observe).', + description: 'Deliver an out-of-band instruction to a running worker inbox.', inputSchema: { type: 'object', properties: { @@ -194,30 +270,130 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin { name: 'await_next', description: - 'Wait for the next worker you spawned to FINISH, then read its deployable verdict. This is ' + - 'how you advance: spawn one or more workers, then call await_next to block until the next ' + - 'one settles. Returns { settled: workerId, status: "done"|"down", score, valid } for a ' + - 'finished worker, or { idle: true } when no worker is still running (then spawn more or stop). ' + - 'Workers run concurrently — spawn a batch, then await_next repeatedly to collect them.', + 'Wait for the next spawned worker to settle. Returns { idle: true } when none are live.', inputSchema: { type: 'object', properties: {} }, handler: async () => { const s = await opts.scope.next() if (!s) return { idle: true } const w = recordSettled(s) return w.status === 'done' - ? { settled: w.id, status: 'done', score: w.score, valid: w.valid } + ? { + settled: w.id, + status: 'done', + score: w.score, + valid: w.valid, + outRef: w.outRef, + } : { settled: w.id, status: 'down', reason: w.reason } }, }, { - name: 'stop', + name: 'list_questions', description: - 'Declare the run complete — every required change is made and verified. The terminal move.', + 'List questions raised by workers, drivers, or analysts. Blocking stop behavior follows questionPolicy.', + inputSchema: { type: 'object', properties: {} }, + handler: () => Promise.resolve({ questions }), + }, + { + name: 'answer_question', + description: 'Record an answer, deferral, or escalation for a loop question.', + inputSchema: { + type: 'object', + properties: { + questionId: { type: 'string' }, + answer: { type: 'string' }, + by: { type: 'string', description: 'Node id or "user".' }, + deferReason: { type: 'string' }, + escalateTo: { type: 'string', enum: ['parent', 'user'] }, + escalateReason: { type: 'string' }, + }, + required: ['questionId'], + }, + handler: (raw) => { + const a = obj(raw) + const questionId = str(a.questionId, 'questionId') + if (typeof a.answer === 'string' && a.answer.length > 0) { + return Promise.resolve({ + question: decideQuestion(questionId, { + kind: 'answer', + answer: a.answer, + by: typeof a.by === 'string' && a.by.length > 0 ? a.by : 'user', + }), + }) + } + if (typeof a.deferReason === 'string' && a.deferReason.length > 0) { + return Promise.resolve({ + question: decideQuestion(questionId, { + kind: 'defer', + reason: a.deferReason, + }), + }) + } + if (a.escalateTo === 'parent' || a.escalateTo === 'user') { + const escalateReason = + typeof a.escalateReason === 'string' && a.escalateReason.length > 0 + ? a.escalateReason + : 'driver escalated' + return Promise.resolve({ + question: decideQuestion(questionId, { + kind: 'escalate', + to: a.escalateTo, + reason: escalateReason, + }), + }) + } + throw new Error('answer_question: provide answer, deferReason, or escalateTo') + }, + }, + { + name: 'ask_parent', + description: 'Raise a question to the parent driver/Pi/user when this driver cannot decide.', + inputSchema: { + type: 'object', + properties: { + from: { type: 'string' }, + level: { type: 'string', enum: ['worker', 'driver', 'loop'] }, + question: { type: 'string' }, + reason: { type: 'string' }, + urgency: { type: 'string', enum: ['continue-without', 'blocks-step', 'blocks-run'] }, + }, + required: ['from', 'level', 'question', 'reason', 'urgency'], + }, + handler: async (raw) => { + const a = obj(raw) + const from = str(a.from, 'from') + const q = await emitNewQuestion( + addQuestion( + { + from, + level: level(a.level), + question: str(a.question, 'question'), + reason: str(a.reason, 'reason'), + urgency: urgency(a.urgency), + }, + from, + { kind: 'escalate', to: 'parent', reason: 'asked parent' }, + ), + ) + return { question: q } + }, + }, + { + name: 'stop', + description: 'Declare the run complete.', inputSchema: { type: 'object', properties: { reason: { type: 'string', description: 'Why you are stopping.' } }, }, handler: (raw) => { + const blocking = blockingQuestionsForStop() + if (blocking.length) { + return Promise.resolve({ + stopped: false, + error: 'unresolved-blocking-questions', + questions: blocking, + }) + } stopped = true const r = obj(raw).reason reason = typeof r === 'string' ? r : undefined @@ -226,29 +402,20 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin }, ] - // list_analysts / run_analyst — present only when the analyst seam is wired. The driver picks a - // kind from the menu and applies it to a worker it is driving; findings are trace-derived (the - // firewall lives in the runner). (define_analyst — authoring a NEW kind at runtime — is deferred.) - if (opts.analystKinds) { + if (opts.analysts) { tools.push({ name: 'list_analysts', - description: - 'List the trace-analyst lenses available to run over a worker — id, what each looks for, and its area.', + description: 'List trace-analyst lenses available to run over a settled worker.', inputSchema: { type: 'object', properties: {} }, - handler: () => Promise.resolve({ analysts: opts.analystKinds }), + handler: () => Promise.resolve({ analysts: opts.analysts?.kinds }), }) - } - if (opts.runAnalyst) { tools.push({ name: 'run_analyst', - description: - 'Apply an analyst LENS to a worker you are driving — run `kind` over the worker’s trace and ' + - 'return its findings (trace-derived, never score-derived). Use `list_analysts` for the menu; ' + - 'run several lenses to triangulate. The worker must have settled (its trace is read from its output).', + description: 'Apply an analyst lens to a settled worker trace.', inputSchema: { type: 'object', properties: { - kind: { type: 'string', description: 'The analyst kind id (see list_analysts).' }, + kind: { type: 'string', description: 'The analyst kind id.' }, workerId: idArg, }, required: ['kind', 'workerId'], @@ -261,10 +428,16 @@ export function createCoordinationTools(opts: CoordinationToolsOptions): Coordin if (!node.outRef) return { error: `worker ${JSON.stringify(id)} has not settled — no trace to analyze yet` } const trace = await opts.blobs.get(node.outRef) - return { findings: await opts.runAnalyst?.(str(a.kind, 'kind'), trace) } + return { findings: await opts.analysts?.run(str(a.kind, 'kind'), trace) } }, }) } - return { tools, isStopped: () => stopped, stopReason: () => reason, settled: () => ledger } + return { + tools, + isStopped: () => stopped, + stopReason: () => reason, + settled: () => ledger, + questions: () => questions, + } } diff --git a/src/runtime/index.ts b/src/runtime/index.ts index 4ec4a38..e3733d5 100644 --- a/src/runtime/index.ts +++ b/src/runtime/index.ts @@ -60,6 +60,15 @@ export { type LoopOptionsForDispatch, loopDispatch, } from './loop-dispatch' +// The third-person observer: a worker's trace → trace-grounded findings, an +// operator report, and durable corpus facts for the next run (the closed loop). +export { + type Observation, + type ObserveInput, + type ObserveOptions, + observe, + renderReport, +} from './observe' // The personify layer + the RSI wave built on the recursive keystone: the persona content seam // (`definePersona`/`runPersonified`), the open shape registry, the content-free generic // combinators, the cross-run corpus, the analyst-on-scope steer firewall, and the trajectory + @@ -247,3 +256,11 @@ export type { ValidationCtx, Validator, } from './types' +export { + type GitWorkspaceOptions, + gitWorkspace, + localShell, + type Shell, + type Workspace, + type WorkspaceCommit, +} from './workspace' diff --git a/src/runtime/observe.ts b/src/runtime/observe.ts new file mode 100644 index 0000000..44aaccd --- /dev/null +++ b/src/runtime/observe.ts @@ -0,0 +1,234 @@ +/** + * The third-person observer — the connective tissue that closes the loop. + * + * A driver spawns a worker; the worker can't see itself. `observe` reads the + * worker's TRACE (what it actually did — every tool call, cost, failure) and + * produces two streams: + * - `findings` / `report` — fed back DOWN (a steer for the next attempt) and + * OUT (the operator-facing "what I noticed + what to change"). + * - `learned` — durable facts written to the cross-run `Corpus` so the NEXT + * run starts smarter (the continuous half of "continuous self-improvement"). + * + * Findings are TRACE-derived, never JUDGE-derived (`derived_from_judge:false`): + * the observer reads behavior, never the acceptance verdict — the selector≠judge + * firewall (docs/learning-flywheel.md). The observer is harness-agnostic: it + * reads a trace + an output, so it watches opencode, codex, hermes, or a BYO + * agent identically. + */ +import { type AnalystFinding, type ChatClient, makeFinding } from '@tangle-network/agent-eval' +import type { Corpus, CorpusRecord } from './personify/wave-types' + +const observerId = 'observe/trace' + +export interface ObserveInput { + /** What the worker was asked to do. */ + task: string + /** What it produced (its final answer / artifact summary). */ + output: string + /** The worker's trace — any event array (sandbox events, tool-call records). */ + trace: ReadonlyArray + /** Terminal status only (passed/failed/unknown) — NOT a judge score; the + * observer never reads the verdict, it reads behavior. */ + outcome?: 'passed' | 'failed' | 'unknown' + /** Provenance back to the run. */ + runId?: string +} + +export interface ObserveOptions { + /** The model-call seam (agent-eval `createChatClient`: router / cli-bridge / …). */ + chat: ChatClient + model?: string + /** When set, learned facts are appended (idempotent) for the next run to read. */ + corpus?: Corpus + /** Tags written onto learned facts + used by the next run's corpus query. */ + tags?: ReadonlyArray + signal?: AbortSignal + /** Cap the trace lines fed to the observer (keeps the call cheap). Default 80. */ + maxTraceLines?: number +} + +export interface Observation { + findings: AnalystFinding[] + /** Facts persisted to the corpus (empty when no corpus was supplied). */ + learned: CorpusRecord[] + /** Operator-facing markdown: what the observer noticed + what to change. */ + report: string +} + +/** Compact the trace into the lines the observer reasons over — tool calls, + * errors, and statuses, in order. Keeps the model call bounded + grounded. */ +function summarizeTrace(trace: ReadonlyArray, maxLines: number): string { + const lines: string[] = [] + for (const ev of trace) { + const e = ev as { type?: string; data?: Record } + const t = (e.type ?? '').toLowerCase() + const d = e.data ?? {} + const part = (d.part ?? {}) as { type?: string; tool?: string; state?: { status?: string } } + if (part.type === 'tool') + lines.push(`tool:${part.tool}${part.state?.status ? `(${part.state.status})` : ''}`) + else if (t.includes('error')) + lines.push(`ERROR: ${String(d.message ?? d.detail ?? '').slice(0, 200)}`) + else if (t === 'status' && typeof d.status === 'string') lines.push(`status:${d.status}`) + else if (t.includes('tool')) lines.push(`tool-event:${t}`) + } + // Collapse runs of identical lines into "xN" so repeated thrash is visible + short. + const out: string[] = [] + for (const ln of lines) { + const prev = out[out.length - 1] + const m = prev?.match(/^(.*?)(?: x(\d+))?$/) + if (m && m[1] === ln) out[out.length - 1] = `${ln} x${(Number(m[2]) || 1) + 1}` + else out.push(ln) + } + return out.slice(0, maxLines).join('\n') || '(no tool/error events in trace)' +} + +const findingsSchema = { + name: 'observer_findings', + schema: { + type: 'object', + additionalProperties: false, + properties: { + findings: { + type: 'array', + items: { + type: 'object', + additionalProperties: false, + properties: { + area: { + type: 'string', + description: 'tool-use | cost | verification | process | failure | latency', + }, + severity: { type: 'string', enum: ['critical', 'high', 'medium', 'low', 'info'] }, + claim: { + type: 'string', + description: 'what you OBSERVED in the trace (a fact, with the evidence)', + }, + recommended_action: { + type: 'string', + description: 'the concrete change for the agent or operator', + }, + audience: { + type: 'string', + enum: ['agent', 'operator'], + description: 'who should act on this', + }, + confidence: { type: 'number' }, + }, + required: ['area', 'severity', 'claim', 'recommended_action', 'audience', 'confidence'], + }, + }, + }, + required: ['findings'], + }, +} as const + +export async function observe(input: ObserveInput, opts: ObserveOptions): Promise { + const traceSummary = summarizeTrace(input.trace, opts.maxTraceLines ?? 80) + const res = await opts.chat.chat( + { + ...(opts.model ? { model: opts.model } : {}), + jsonSchema: findingsSchema as unknown as { name: string; schema: Record }, + messages: [ + { + role: 'system', + content: + 'You are a third-person OBSERVER watching an AI agent work. You see its TRACE (what it did), not its grader. ' + + 'From the trace, name SPECIFIC, behavior-grounded findings: wasted/duplicated tool calls, thrash/retries, ' + + 'token/cost waste, missing verification, failure patterns. For each, a concrete recommended_action, and ' + + 'whether the AGENT (fix its skills/prompt/tools) or the OPERATOR (fix framing/decomposition/config) should act. ' + + 'Only claim what the trace shows. No findings if the run was clean.', + }, + { + role: 'user', + content: + `TASK: ${input.task}\n\nOUTCOME: ${input.outcome ?? 'unknown'}\n\n` + + `FINAL OUTPUT (truncated):\n${input.output.slice(0, 1200)}\n\n` + + `TRACE (in order; "xN" = repeated):\n${traceSummary}`, + }, + ], + }, + { ...(opts.signal ? { signal: opts.signal } : {}) }, + ) + + const parsed = parseFindings(res.content) + const producedAt = input.runId ? `${input.runId}` : observerId + const findings: AnalystFinding[] = parsed.map((f) => + makeFinding({ + analyst_id: observerId, + area: `${f.area}`, + severity: f.severity, + claim: f.claim, + recommended_action: f.recommended_action, + confidence: typeof f.confidence === 'number' ? f.confidence : 0.5, + evidence_refs: [], + // The observer reads BEHAVIOR, never the judge verdict — firewall provenance. + derived_from_judge: false, + metadata: { audience: f.audience }, + ...(input.runId ? { subject: input.runId } : {}), + }), + ) + + const learned: CorpusRecord[] = [] + if (opts.corpus) { + for (const f of findings) { + const record: CorpusRecord = { + schemaVersion: '1.0.0', + id: f.finding_id, + runId: input.runId ?? observerId, + producedAt: f.produced_at ?? producedAt, + area: f.area, + claim: f.recommended_action ?? f.claim, + ...(f.claim ? { rationale: f.claim } : {}), + tags: [...(opts.tags ?? []), `audience:${(f.metadata?.audience as string) ?? 'agent'}`], + confidence: f.confidence, + evidence: [{ kind: 'finding', uri: f.finding_id }], + } + const r = await opts.corpus.append(record) + if (r.succeeded) learned.push(record) + } + } + + return { findings, learned, report: renderReport(findings) } +} + +interface RawFinding { + area: string + severity: AnalystFinding['severity'] + claim: string + recommended_action: string + audience: 'agent' | 'operator' + confidence: number +} + +function parseFindings(content: string): RawFinding[] { + let obj: unknown + try { + obj = JSON.parse(content) + } catch { + const m = content.match(/\{[\s\S]*\}/) + obj = m ? JSON.parse(m[0]) : { findings: [] } + } + const arr = (obj as { findings?: unknown }).findings + return Array.isArray(arr) ? (arr as RawFinding[]) : [] +} + +/** Operator-facing report, split by who should act. The agent block is the + * steer; the operator block is the advice. */ +export function renderReport(findings: ReadonlyArray): string { + if (findings.length === 0) return '✓ clean run — the observer found nothing to change.' + const audience = (f: AnalystFinding): string => (f.metadata?.audience as string) ?? 'agent' + const forAgent = findings.filter((f) => audience(f) === 'agent') + const forOperator = findings.filter((f) => audience(f) === 'operator') + const block = (title: string, fs: ReadonlyArray): string => + fs.length === 0 + ? '' + : `**${title}**\n${fs + .map((f) => `- [${f.severity}] ${f.claim}\n → ${f.recommended_action ?? ''}`) + .join('\n')}\n` + return [ + block('For the agent (fix skills / prompt / tools)', forAgent), + block('For you (the operator)', forOperator), + ] + .filter(Boolean) + .join('\n') +} diff --git a/src/runtime/run-loop.ts b/src/runtime/run-loop.ts index f265815..83c7494 100644 --- a/src/runtime/run-loop.ts +++ b/src/runtime/run-loop.ts @@ -656,7 +656,6 @@ async function executeIteration(args: ExecuteIterationArgs( // host-agent registration) can't surface as a failure — readiness is observed // from sandbox status, and a gateway-timed-out create is recovered by lookup. if (signal.aborted) throwAbort() - return acquireSandbox(client, opts, { signal }) + const box = await acquireSandbox(client, opts, { signal }) + await spec.prepareBox?.(box, { signal }) + return box } interface FinalizeArgs { diff --git a/src/runtime/sandbox-lineage.ts b/src/runtime/sandbox-lineage.ts index a4a8f7d..18e20b4 100644 --- a/src/runtime/sandbox-lineage.ts +++ b/src/runtime/sandbox-lineage.ts @@ -212,6 +212,7 @@ export function createSandboxLineage( if (signal.aborted) throwAbort() const opts: CreateSandboxOptions = buildBackendOptions(spec.profile, spec.sandboxOverrides) const box = await acquireSandbox(client, opts, { signal }) + await spec.prepareBox?.(box, { signal }) owned.push(box) return box } @@ -219,7 +220,6 @@ export function createSandboxLineage( return { async start(spec, prompt, signal) { const box = await acquireFresh(spec, signal) - await spec.prepareBox?.(box, { signal }) const sessionId = mintSessionId() const events = promptEvents(streaming, box, prompt, sessionId, signal) return { handle: { box, sessionId }, events } @@ -266,7 +266,6 @@ export function createSandboxLineage( } } const box = await acquireFresh(spec, signal) - await spec.prepareBox?.(box, { signal }) const sessionId = mintSessionId() return { handle: { box, sessionId }, diff --git a/src/runtime/supervise/budget.ts b/src/runtime/supervise/budget.ts index 065d4fc..4293e86 100644 --- a/src/runtime/supervise/budget.ts +++ b/src/runtime/supervise/budget.ts @@ -3,15 +3,15 @@ * * The conserved budget reservation pool — the invariant the whole instrument * rests on (critique M5/B3). One root `Budget` becomes a conserved pool of three - * quantities (tokens, usd, iterations) plus an absolute deadline. Children RESERVE - * atomically at spawn and RECONCILE at settle: + * quantities (tokens, usd, iterations) plus an absolute deadline. Children reserve + * atomically at spawn and reconcile at settle: * * total ≡ free + reserved + committed (invariant, always) * - * `reserve` moves a child's whole ceiling from `free` → `reserved` and FAILS CLOSED + * `reserve` moves a child's whole ceiling from `free` → `reserved` and fails closed * when `free` can't cover it (never read-then-spawn overcommit, so `Σk(treatment) ≡ * Σk(blind)` by construction). `reconcile` releases the reservation, commits ACTUAL - * spend, and refunds the unspent remainder to `free`. Tokens and usd are SEPARATE + * spend, and refunds the unspent remainder to `free`. Tokens and usd are separate * channels (`LoopTokenUsage` has no `usd`); iterations are conserved alongside them. * * Pure and deterministic: `now()` is injected, there is no I/O, and no wall-clock or diff --git a/src/runtime/supervise/types.ts b/src/runtime/supervise/types.ts index 2affe08..84c593f 100644 --- a/src/runtime/supervise/types.ts +++ b/src/runtime/supervise/types.ts @@ -266,9 +266,9 @@ export type Settled = /** * The budget-conserving reactive scope an `Agent.act` runs inside. `spawn` reserves - * budget atomically from the shared pool and FAILS CLOSED when the pool can't cover it; - * `next()` is a ray.wait cursor (n=1) over THIS scope's IN-MEMORY live set; `view` reads - * the in-memory nursery (NOT the log), O(live). + * budget atomically from the shared pool and fails closed when the pool cannot cover it. + * `next()` waits for one settlement from this scope's live set; `view` reads live state, + * not the replay log. */ export interface Scope { /** diff --git a/src/runtime/workspace.ts b/src/runtime/workspace.ts new file mode 100644 index 0000000..22f1cb8 --- /dev/null +++ b/src/runtime/workspace.ts @@ -0,0 +1,88 @@ +/** Command runner seam. Host code can use `localShell`; sandbox code can wrap `box.exec`. */ +export type Shell = ( + args: ReadonlyArray, + cwd?: string, +) => Promise<{ stdout: string; stderr: string; code: number }> + +export type WorkspaceCommit = + | { readonly ok: true; readonly rev: string } + | { readonly ok: false; readonly conflict: string } + +export interface Workspace { + readonly ref: string + materialize(dir: string): Promise + commit(dir: string, message: string): Promise + head(): Promise +} + +export function localShell(): Shell { + return async (args, cwd) => { + const { execFile } = await import('node:child_process') + const [bin, ...rest] = args + return new Promise((resolve) => { + execFile( + bin ?? '', + rest, + { cwd, encoding: 'utf-8', maxBuffer: 64 * 1024 * 1024 }, + (err: Error | null, stdout: string, stderr: string) => { + resolve({ + stdout: stdout ?? '', + stderr: stderr ?? '', + code: err ? ((err as { code?: number }).code ?? 1) : 0, + }) + }, + ) + }) + } +} + +export interface GitWorkspaceOptions { + readonly ref: string + readonly shell?: Shell + readonly branch?: string + readonly noHooks?: boolean +} + +export function gitWorkspace(opts: GitWorkspaceOptions): Workspace { + const shell = opts.shell ?? localShell() + const branch = opts.branch ?? 'main' + const cfg = opts.noHooks === false ? [] : ['-c', 'core.hooksPath=/dev/null'] + const ident = ['-c', 'user.email=workspace@tangle.local', '-c', 'user.name=workspace'] + + const run = async (args: string[], cwd?: string): Promise => { + const res = await shell(['git', ...cfg, ...ident, ...args], cwd) + if (res.code !== 0) { + throw new Error( + `git ${args.join(' ')} failed (${res.code}): ${tail(res.stderr || res.stdout)}`, + ) + } + return res.stdout + } + + return { + ref: opts.ref, + materialize: (dir) => run(['clone', '--branch', branch, opts.ref, dir]).then(() => {}), + async commit(dir, message) { + await run(['add', '-A'], dir) + const status = await run(['status', '--porcelain'], dir) + if (!status.trim()) return { ok: true, rev: (await run(['rev-parse', 'HEAD'], dir)).trim() } + await run(['commit', '-m', message], dir) + const pull = await shell(['git', ...cfg, ...ident, 'pull', '--rebase', 'origin', branch], dir) + if (pull.code !== 0) { + await shell(['git', ...cfg, 'rebase', '--abort'], dir).catch(() => {}) + return { ok: false, conflict: tail(pull.stderr || pull.stdout) } + } + const push = await shell(['git', ...cfg, ...ident, 'push', 'origin', branch], dir) + if (push.code !== 0) return { ok: false, conflict: tail(push.stderr || push.stdout) } + return { ok: true, rev: (await run(['rev-parse', 'HEAD'], dir)).trim() } + }, + async head() { + const out = await run(['ls-remote', opts.ref, `refs/heads/${branch}`]) + return out.split(/\s+/)[0] ?? '' + }, + } +} + +function tail(s: string): string { + return s.slice(-400) +} diff --git a/tests/loops/coordination.test.ts b/tests/loops/coordination.test.ts index c63b0da..53c849a 100644 --- a/tests/loops/coordination.test.ts +++ b/tests/loops/coordination.test.ts @@ -3,8 +3,6 @@ import { createMcpServer } from '../../src/mcp/server' import { createCoordinationTools } from '../../src/mcp/tools/coordination' import type { Agent, ResultBlobStore, Scope, Spend } from '../../src/runtime' -// The toolbox is a thin wrapper over the keystone Scope (spawn/view/send are tested in -// supervise.test.ts); this verifies the MCP handlers call the right verbs and shape the results. const zeroSpend = (): Spend => ({ iterations: 0, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 }) function mockScope() { @@ -61,8 +59,8 @@ const tool = (tb: ReturnType, name: string) => { return t } -describe('operator toolbox (Scope-as-MCP)', () => { - it('spawn_worker → workerId; fail-closed → { error }', async () => { +describe('coordination tools', () => { + it('spawn_worker returns workerId and fails closed when admission fails', async () => { const { scope, setAdmit } = mockScope() const tb = createCoordinationTools({ scope, @@ -79,22 +77,32 @@ describe('operator toolbox (Scope-as-MCP)', () => { }) }) - it('observe_worker returns status; unknown id → error', async () => { + it('observe_worker returns live status and settled output', async () => { const { scope } = mockScope() const tb = createCoordinationTools({ scope, - blobs, + blobs: { + get: async (ref) => (ref === 'blob:w1' ? { answer: 42 } : undefined), + put: async () => {}, + }, makeWorkerAgent, perWorker: { maxIterations: 1, maxTokens: 10 }, }) - const o = (await tool(tb, 'observe_worker').handler({ workerId: 'w0' })) as { status: string } - expect(o.status).toBe('running') + expect(await tool(tb, 'observe_worker').handler({ workerId: 'w0' })).toMatchObject({ + status: 'running', + output: null, + }) + expect(await tool(tb, 'observe_worker').handler({ workerId: 'w1' })).toMatchObject({ + status: 'done', + outRef: 'blob:w1', + output: { answer: 42 }, + }) expect(await tool(tb, 'observe_worker').handler({ workerId: 'nope' })).toEqual({ error: 'unknown workerId "nope"', }) }) - it('steer_worker delivers to a live worker via scope.send; false for unknown', async () => { + it('steer_worker delivers through Scope.send', async () => { const { scope, sent } = mockScope() const tb = createCoordinationTools({ scope, @@ -111,21 +119,7 @@ describe('operator toolbox (Scope-as-MCP)', () => { }) }) - it('stop flips isStopped + records the reason', async () => { - const { scope } = mockScope() - const tb = createCoordinationTools({ - scope, - blobs, - makeWorkerAgent, - perWorker: { maxIterations: 1, maxTokens: 10 }, - }) - expect(tb.isStopped()).toBe(false) - await tool(tb, 'stop').handler({ reason: 'all verified' }) - expect(tb.isStopped()).toBe(true) - expect(tb.stopReason()).toBe('all verified') - }) - - it('await_next drains the next settlement → verdict, and records it in the settled() ledger', async () => { + it('await_next drains settlements into the driver ledger', async () => { const { scope } = mockScope() const settlements = [ { @@ -153,6 +147,7 @@ describe('operator toolbox (Scope-as-MCP)', () => { status: 'done', score: 0.83, valid: true, + outRef: 'blob:w7', }) expect(await tool(tb, 'await_next').handler({})).toEqual({ idle: true }) expect(tb.settled()).toEqual([ @@ -160,7 +155,42 @@ describe('operator toolbox (Scope-as-MCP)', () => { ]) }) - it('list_analysts surfaces the menu; run_analyst applies a lens to a SETTLED worker', async () => { + it('blocks stop under failClosed until a parent question is answered', async () => { + const { scope } = mockScope() + const emitted: unknown[] = [] + const tb = createCoordinationTools({ + scope, + blobs, + makeWorkerAgent, + perWorker: { maxIterations: 1, maxTokens: 10 }, + questionPolicy: 'failClosed', + onEvent: (event) => emitted.push(event), + }) + + const r = (await tool(tb, 'ask_parent').handler({ + from: 'driver-1', + level: 'driver', + question: 'Which API version should this migration target?', + reason: 'worker found two supported versions', + urgency: 'blocks-run', + })) as { question: { id: string } } + expect(await tool(tb, 'stop').handler({ reason: 'done' })).toMatchObject({ + stopped: false, + error: 'unresolved-blocking-questions', + }) + await tool(tb, 'answer_question').handler({ + questionId: r.question.id, + answer: 'Target v2.', + by: 'user', + }) + expect(await tool(tb, 'stop').handler({ reason: 'answered and verified' })).toEqual({ + stopped: true, + }) + expect(tb.questions()[0]).toMatchObject({ status: 'answered' }) + expect(emitted).toEqual([{ type: 'question', question: expect.objectContaining(r.question) }]) + }) + + it('list_analysts surfaces the menu and run_analyst applies a lens to a settled worker', async () => { const { scope } = mockScope() const traceBlobs: ResultBlobStore = { get: async (ref) => (ref === 'blob:w1' ? { messages: ['trace'] } : undefined), @@ -172,27 +202,31 @@ describe('operator toolbox (Scope-as-MCP)', () => { blobs: traceBlobs, makeWorkerAgent, perWorker: { maxIterations: 1, maxTokens: 10 }, - analystKinds: [{ id: 'completeness', description: 'unfinished work', area: 'failure-mode' }], - runAnalyst: async (kind, trace) => { - seen.push({ kind, trace }) - return [{ claim: 'X missing' }] + analysts: { + kinds: [{ id: 'completeness', description: 'unfinished work', area: 'failure-mode' }], + run: async (kind, trace) => { + seen.push({ kind, trace }) + return [{ claim: 'X missing' }] + }, }, }) - expect((await tool(tb, 'list_analysts').handler({})) as { analysts: unknown[] }).toEqual({ + expect(await tool(tb, 'list_analysts').handler({})).toEqual({ analysts: [{ id: 'completeness', description: 'unfinished work', area: 'failure-mode' }], }) - // settled worker → the lens runs over its trace. - const r = (await tool(tb, 'run_analyst').handler({ kind: 'completeness', workerId: 'w1' })) as { - findings: unknown - } - expect(r).toEqual({ findings: [{ claim: 'X missing' }] }) + expect(await tool(tb, 'run_analyst').handler({ kind: 'completeness', workerId: 'w1' })).toEqual( + { + findings: [{ claim: 'X missing' }], + }, + ) expect(seen).toEqual([{ kind: 'completeness', trace: { messages: ['trace'] } }]) - // running worker has no trace yet → typed error, lens not run. - const r2 = await tool(tb, 'run_analyst').handler({ kind: 'completeness', workerId: 'w0' }) - expect(r2).toEqual({ error: expect.stringContaining('has not settled') }) + expect(await tool(tb, 'run_analyst').handler({ kind: 'completeness', workerId: 'w0' })).toEqual( + { + error: expect.stringContaining('has not settled'), + }, + ) }) - it('createMcpServer serves the operator tools alongside built-ins; a shadow throws', () => { + it('createMcpServer serves coordination tools alongside built-ins; a shadow throws', () => { const { scope } = mockScope() const tb = createCoordinationTools({ scope, @@ -203,7 +237,7 @@ describe('operator toolbox (Scope-as-MCP)', () => { const server = createMcpServer({ extraTools: tb.tools }) expect(server.tools.has('spawn_worker')).toBe(true) expect(server.tools.has('steer_worker')).toBe(true) - expect(server.tools.has('delegate_feedback')).toBe(true) // built-in still present + expect(server.tools.has('delegate_feedback')).toBe(true) expect(() => createMcpServer({ extraTools: [ diff --git a/tests/loops/workspace.test.ts b/tests/loops/workspace.test.ts new file mode 100644 index 0000000..113d9d1 --- /dev/null +++ b/tests/loops/workspace.test.ts @@ -0,0 +1,71 @@ +import { execFileSync } from 'node:child_process' +import { existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { gitWorkspace } from '../../src/runtime/workspace' + +const git = (args: string[], cwd?: string): string => + execFileSync( + 'git', + ['-c', 'core.hooksPath=/dev/null', '-c', 'user.email=t@t', '-c', 'user.name=t', ...args], + { cwd, encoding: 'utf-8', stdio: 'pipe' }, + ) + +function seedBare(): string { + const bare = `${mkdtempSync(join(tmpdir(), 'ws-bare-'))}.git` + git(['init', '--bare', '-b', 'main', bare]) + const seed = mkdtempSync(join(tmpdir(), 'ws-seed-')) + git(['clone', bare, seed]) + writeFileSync(join(seed, 'seed.txt'), 'base\n') + git(['add', '-A'], seed) + git(['commit', '-m', 'seed'], seed) + git(['push', 'origin', 'main'], seed) + rmSync(seed, { recursive: true, force: true }) + return bare +} + +describe('gitWorkspace', () => { + let bare: string + const temps: string[] = [] + const fresh = (): string => { + const dir = mkdtempSync(join(tmpdir(), 'ws-work-')) + temps.push(dir) + return dir + } + + beforeEach(() => { + bare = seedBare() + }) + + afterEach(() => { + rmSync(bare, { recursive: true, force: true }) + for (const dir of temps.splice(0)) rmSync(dir, { recursive: true, force: true }) + }) + + it('carries durable state across fresh worker filesystems', async () => { + const ws = gitWorkspace({ ref: bare }) + const w1 = fresh() + await ws.materialize(w1) + writeFileSync(join(w1, 'a.txt'), 'one\n') + expect(await ws.commit(w1, 'add a')).toMatchObject({ ok: true }) + + const w2 = fresh() + await ws.materialize(w2) + expect(existsSync(join(w2, 'a.txt'))).toBe(true) + expect(readFileSync(join(w2, 'a.txt'), 'utf-8')).toBe('one\n') + }) + + it('returns a typed conflict instead of overwriting concurrent edits', async () => { + const ws = gitWorkspace({ ref: bare }) + const w1 = fresh() + const w2 = fresh() + await ws.materialize(w1) + await ws.materialize(w2) + writeFileSync(join(w1, 'seed.txt'), 'w1\n') + writeFileSync(join(w2, 'seed.txt'), 'w2\n') + + expect(await ws.commit(w1, 'w1')).toMatchObject({ ok: true }) + expect(await ws.commit(w2, 'w2')).toMatchObject({ ok: false }) + }) +}) diff --git a/tests/mcp/coder-delegate-selection.test.ts b/tests/mcp/coder-delegate-selection.test.ts index a2e6f4f..2b3affc 100644 --- a/tests/mcp/coder-delegate-selection.test.ts +++ b/tests/mcp/coder-delegate-selection.test.ts @@ -107,6 +107,32 @@ describe('createDefaultCoderDelegate — reviewer gate + winner selection', () = // smaller diff → higher diffSize score → highest-score favors it; either way a valid winner. expect(['small', 'big']).toContain(out.branch) }) + + it('applies harness and model overrides on the single-coder path', async () => { + let createOptions: CreateSandboxOptions | undefined + const delegate = createDefaultCoderDelegate({ + sandboxClient: { + async create(opts?: CreateSandboxOptions): Promise { + createOptions = opts + return { + async *streamPrompt() { + yield { type: 'result', data: { result: CANDIDATES[0] } } satisfies SandboxEvent + }, + } as unknown as SandboxInstance + }, + }, + harness: 'opencode', + model: 'zai/glm-4.7', + }) + + await delegate({ goal: 'fix it', repoRoot: '/repo' }, ctx) + + const profile = createOptions?.backend?.profile as + | { model?: { default?: string }; metadata?: Record } + | undefined + expect(profile?.model?.default).toBe('zai/glm-4.7') + expect(profile?.metadata?.backendType).toBe('opencode') + }) }) import type { LoopTraceEmitter, LoopTraceEvent } from '../../src/runtime'