diff --git a/.changeset/encryption-migrations.md b/.changeset/encryption-migrations.md new file mode 100644 index 00000000..29022a34 --- /dev/null +++ b/.changeset/encryption-migrations.md @@ -0,0 +1,19 @@ +--- +'@cipherstash/cli': minor +'@cipherstash/migrate': minor +--- + +Add `stash encrypt` command group and `@cipherstash/migrate` library for plaintext → encrypted column migrations. + +New CLI commands: + +- `stash encrypt status` — per-column migration status (phase, backfill progress, drift between intent and state, EQL registration). +- `stash encrypt plan` — diff `.cipherstash/migrations.json` (intent) vs observed state. +- `stash encrypt advance --to --table --column ` — record a phase transition (`schema-added` / `dual-writing` / `backfilling` / `backfilled` / `cut-over` / `dropped`). +- `stash encrypt backfill --table --column ` — resumable, idempotent, chunked encryption of plaintext into `_encrypted`. Uses the user's encryption client (Protect/Stack). SIGINT-safe; re-run to resume. +- `stash encrypt cutover --table --column ` — runs `eql_v2.rename_encrypted_columns()` inside a transaction; optionally forces Proxy config refresh via `CIPHERSTASH_PROXY_URL`. After cutover, apps reading `` transparently receive the encrypted column. +- `stash encrypt drop --table --column ` — generates a migration file that drops the old plaintext column. + +`stash db install` now also installs a `cipherstash.cs_migrations` table used to track per-column migration runtime state (current phase, backfill cursor, rows processed). The table is append-only (event-log shape) and kept separate from `eql_v2_configuration` which remains the authoritative EQL intent store used by Proxy. + +The new `@cipherstash/migrate` package exposes the same primitives as a library for users who want to embed backfill in their own workers or cron jobs — all commands are thin wrappers around its exports (`runBackfill`, `appendEvent`, `latestByColumn`, `progress`, `renameEncryptedColumns`, `reloadConfig`, `readManifest`, `writeManifest`). diff --git a/docs/plans/encryption-migrations.md b/docs/plans/encryption-migrations.md new file mode 100644 index 00000000..c82f2fe2 --- /dev/null +++ b/docs/plans/encryption-migrations.md @@ -0,0 +1,236 @@ +# Encryption Migrations — Implementation Plan + +## Context + +CipherStash today can encrypt a column at rest via EQL + either Stack/Protect.js (client-side) or the CipherStash Proxy (transparent). What it *doesn't* have is a first-class way to migrate an **existing plaintext column** into an encrypted one safely in production. EQL ships the schema/config primitives (`add_column`, `migrate_config`, `rename_encrypted_columns`) but no backfill orchestrator, no per-column phase tracking, and no resumable data mover. Today users have to wire this up themselves, which is both the biggest onboarding friction and the biggest correctness risk (partial backfills, reads on the wrong column, silent plaintext leaks). + +This plan adds a shared migration substrate — CLI + library — that walks each column through the full lifecycle: + +``` +schema-added → dual-writing → backfilling → backfilled → cut-over → dropped +``` + +The same mechanism serves Stack and Proxy users. Phase 1 ships the status inspector and the backfill engine (the two pieces with no good existing workaround). The other phases get lightweight commands that mostly orchestrate existing EQL functions and delegate code changes to the rulebook/agent flow. + +## Scope (Phase 1) + +1. `stash encrypt status` — per-column view of current phase, EQL registration, backfill progress, drift between intent and state. +2. `stash encrypt backfill` — resumable, idempotent, chunked plaintext → encrypted migration using the user's encryption client (Protect/Stack mode). Progress reporting, checkpoint on every chunk, `--resume` / `--table` / `--column` / `--chunk-size` flags. +3. A new `cs_migrations` table + small library (`@cipherstash/migrate` or co-located in `@cipherstash/stack`) that the CLI commands drive. Library is exported so users can embed backfill in their own workers/cron later without new infra. +4. `.cipherstash/migrations.json` repo manifest = intent (desired columns + index set + target phase). `stash encrypt plan` diffs intent vs. observed state. +5. Thin wrappers for the other phases so users can drive end-to-end from the CLI today, even if those phases are mostly pass-throughs: + - `stash encrypt advance --to dual-writing` — records user-declared transition into `cs_migrations` and reminds them what code change is needed. Delegates code changes to the agent-handoff rulebook (see `init-agent-handoff.md`). + - `stash encrypt cutover` — wraps `eql_v2.rename_encrypted_columns()` + `eql_v2.reload_config()` (via Proxy if present). + - `stash encrypt drop` — emits a migration file that drops `_plaintext`. + +**Out of Phase 1:** Proxy-mode backfill (Phase 2), CS-hosted backfill runner (Phase 3), upstreaming `cs_migrations` into EQL as `eql_v2_migrations` (Phase 3). + +## Architecture + +### 1. Three-layer state model + +| Layer | Home | Role | Frequency | +|---|---|---|---| +| **Intent** | `.cipherstash/migrations.json` (repo) | Desired columns, index set, target phase. Code-reviewable. | Changes with commits. | +| **EQL intent** | `eql_v2_configuration` (DB, existing) | Authoritative "is this column encrypted, with which indexes" — drives Proxy. Unchanged by this plan. | Changes per schema cycle. | +| **Runtime state** | `cs_migrations` (DB, new) | Per-column phase, backfill cursor, rows processed, timestamps. Append-only event log. | High-frequency during backfill. | + +Why separate `cs_migrations` from `eql_v2_configuration`: the EQL config's `data` JSONB has a strict CHECK constraint (`{v, tables}` with enumerated `cast_as` + index kinds) that rejects custom metadata; its `state` enum is global (only one `{active, pending, encrypting}` at a time) so it can't represent multiple columns in different phases simultaneously; and backfill-cadence writes would collide with Proxy's 60s config refresh. Detailed reasoning in the conversation transcript linked from commit. + +`cs_migrations` schema (append-only; one row per transition or checkpoint): + +```sql +CREATE SCHEMA IF NOT EXISTS cipherstash; + +CREATE TABLE cipherstash.cs_migrations ( + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + table_name text NOT NULL, + column_name text NOT NULL, + event text NOT NULL, -- 'schema_added' | 'dual_writing' | 'backfill_started' | 'backfill_checkpoint' | 'backfilled' | 'cut_over' | 'dropped' | 'error' + phase text NOT NULL, -- current phase AFTER this event + cursor_value text, -- keyset pagination cursor (usually the last processed PK) + rows_processed bigint, + rows_total bigint, + details jsonb, -- per-event extra data: error message, chunk size, duration, etc. + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX ON cipherstash.cs_migrations (table_name, column_name, id DESC); +``` + +Current state per column = latest row for `(table_name, column_name)`. History is preserved. + +### 2. `stash encrypt status` + +Reads from three sources in parallel and renders a unified table: + +- **Intent** — `.cipherstash/migrations.json` +- **EQL state** — `SELECT state, data FROM eql_v2_configuration WHERE state IN ('active','pending','encrypting')`; extract per-column registration + index set. +- **Runtime state** — latest `cs_migrations` row per `(table, column)`. + +Also fires `information_schema.columns` to detect physical column presence (``, `_encrypted`, `_plaintext`). + +Output (example): + +``` +TABLE COLUMN PHASE EQL INDEXES PROGRESS +users email backfilling active unique, match 421,018 / 2,104,552 (20%) ETA 12m +users ssn dual-writing active unique — +users phone schema-added ⚠ pending match registered, awaiting migrate_config +orders notes cut-over active match, ste_vec 1,204,091 / 1,204,091 (100%) +``` + +Drift flags: + +- ⚠ intent says `backfilling` but EQL has no active entry +- ⚠ EQL has index intent says doesn't +- ⚠ `_encrypted` column doesn't exist +- ⚠ backfill says complete but `COUNT(*) WHERE col_encrypted IS NULL` > 0 + +### 3. `stash encrypt backfill` — the meat + +**Flow per column:** + +1. Validate preconditions: column registered in active EQL config; `_encrypted` physically exists; phase in `cs_migrations` is `dual_writing` or `backfilling` (not already complete). +2. Load user's encryption client via the same dynamic-import pattern the wizard uses (`loadStashConfig` → dynamic import of `src/encryption/index.ts` → `.init()` with env-sourced credentials). +3. Determine `rows_total` (`SELECT count(*) FROM t WHERE col IS NOT NULL AND col_encrypted IS NULL`). +4. Determine resume cursor: last `backfill_checkpoint` event's `cursor_value`, or NULL. +5. Emit `backfill_started` event. +6. Loop: keyset-paginate on primary key (or user-specified `--order-by`): + ```sql + SELECT id, FROM + WHERE IS NOT NULL AND _encrypted IS NULL AND id > $cursor + ORDER BY id ASC LIMIT $chunk_size + ``` + - Default `chunk_size` = 1000 (configurable). + - Call `bulkEncryptModels(rows, table, client)` from `@cipherstash/stack`. + - Write back with a single `UPDATE ... FROM (VALUES ...)` per chunk. + - Wrap the UPDATE in a transaction; insert `backfill_checkpoint` event in the same txn. (Atomicity: either the chunk is persisted and checkpointed, or neither.) + - Stream progress to stdout (tty-aware — simple log lines in CI). +7. When the chunk returns zero rows, emit `backfilled` event, transition phase. +8. Error handling: + - Per-chunk try/catch → `error` event row with `details: { message, stack, cursor }`, halt (fail-fast default) or continue (`--continue-on-error` flag for lower-value columns). + - SIGINT / SIGTERM: finish current chunk, checkpoint, exit cleanly. + +**Resumability guarantees:** + +- **Idempotent** — the `col_encrypted IS NULL` filter ensures re-running never re-encrypts a row, even without the checkpoint cursor. +- **Resumable** — checkpoint cursor skips already-processed rows for speed. +- **Multi-machine safe** — even without locking, concurrent runners converge (they'll race on the same rows but the UPDATE is idempotent; the `IS NULL` guard prevents double-writes). A `SELECT ... FOR UPDATE SKIP LOCKED` variant will be added in Phase 2 if needed. + +**Batch sizing guidance in docs:** start at 1000, lower if you see locking contention, raise for wide columns with small values. Include a `--dry-run` that samples one chunk and prints timings. + +### 4. `stash encrypt advance --to ` + +Records a user-declared transition. This is the honest path for phases where the tool can't safely detect the state (dual-writing is an app-code property, not a DB property): + +- `--to dual-writing`: insert `dual_writing` event; print a reminder + the relevant rulebook snippet for editing persistence code. Offer to invoke the agent handoff if configured. +- `--to backfilling`: insert event; effectively equivalent to starting `stash encrypt backfill` (and does so unless `--no-run`). + +### 5. `stash encrypt cutover` + +For each column in `backfilled` phase, in a single transaction: + +```sql +BEGIN; +-- Renames -> _plaintext, _encrypted -> +SELECT eql_v2.rename_encrypted_columns(); +COMMIT; + +-- If Proxy URL is configured, force refresh +\c +SELECT eql_v2.reload_config(); +``` + +Record `cut_over` event. App's existing `SELECT email FROM users` now returns the encrypted column (decrypted transparently by Proxy or client-side by Stack). No app code change required for reads — this is the big payoff of the rename approach. + +### 6. `stash encrypt drop` + +For columns in `cut_over` phase: + +1. Read Drizzle / Prisma / other migration tooling from repo (we already detect this in init). +2. Emit a standard migration file (drizzle format by default): `ALTER TABLE
DROP COLUMN _plaintext;`. +3. Print next-step instructions ("review and run `drizzle-kit generate && drizzle-kit migrate`" or equivalent). +4. Only record the `dropped` event *after* a follow-up `stash encrypt reconcile` verifies the column is gone from `information_schema.columns`. + +## Critical files to modify or create + +- `stack/packages/cli/src/commands/encrypt/` — **new command group** (parallel to `db/`) + - `index.ts` — subcommand registration + - `status.ts` — new + - `backfill.ts` — new + - `advance.ts` — new + - `cutover.ts` — new + - `drop.ts` — new + - `plan.ts` — new (diffs intent vs. observed) +- `stack/packages/cli/src/bin/stash.ts` — register `encrypt` subcommand (analogous to existing `db` registration at ~line 237) +- `stack/packages/migrate/` — **new package** (library the CLI drives) + - `src/state.ts` — `cs_migrations` DAO (append event, get latest, get progress) + - `src/backfill.ts` — the chunked loop, exported as `runBackfill({ table, column, client, db, chunkSize, signal })` + - `src/cursor.ts` — keyset pagination primitive + - `src/eql.ts` — thin wrappers over `eql_v2.*` functions (rename, reload, config read) + - `src/manifest.ts` — read/write `.cipherstash/migrations.json` + - `src/schema.sql` — `cs_migrations` DDL, installed by `db install` or an explicit `encrypt install` step +- `stack/packages/cli/src/commands/db/install.ts` — extend to install `cs_migrations` schema alongside EQL +- `stack/packages/cli/src/commands/wizard/lib/gather.ts` — reuse introspection for `status` (no changes needed, just an import) +- `stack/packages/cli/src/config/` — extend `stash.config.ts` loader so backfill subprocess can dynamically import user's encryption client +- `stack/packages/cli/package.json` — add `@cipherstash/migrate` dep +- Rulebook partials (see `init-agent-handoff.md`) — **add** per-integration sections for "how to wire dual-write in your persistence layer" so the agent handoff can apply Phase 2 code changes consistently + +## Existing primitives to reuse (do not reinvent) + +- `@cipherstash/stack` `bulkEncryptModels`, `bulkDecryptModels`, `encryptModel`, `decryptModel` (at `packages/stack/src/encryption/operations/`). Bulk APIs do not chunk internally — our code chunks. +- `introspectDatabase` in `packages/cli/src/commands/wizard/tools/wizard-tools.ts:150-191`. +- `loadStashConfig` + dynamic encryption-client import (currently in `packages/cli/src/commands/wizard/lib/`) — lift into `@cipherstash/migrate` so both CLI and library users get it. +- `rewriteEncryptedAlterColumns` in `packages/cli/src/commands/db/rewrite-migrations.ts` — the phase-1 schema-add is already solved by drizzle-kit + this rewriter. The new commands **will not** re-solve it. +- EQL functions (Postgres): `eql_v2.add_column`, `eql_v2.add_search_config`, `eql_v2.migrate_config`, `eql_v2.activate_config`, `eql_v2.rename_encrypted_columns`, `eql_v2.reload_config`, `eql_v2.count_encrypted_with_active_config`, `eql_v2.select_pending_columns`, `eql_v2.ready_for_encryption`. +- `db push` in `packages/cli/src/commands/db/push.ts` — already handles writing to `eql_v2_configuration`; reuse the DAO. + +## Verification + +1. **Unit** + - `cs_migrations` DAO: append event, latest-by-column, progress query. + - Cursor pagination: exhausts all rows, handles gaps, stable under concurrent inserts (snapshot-based row count held at start). + - Manifest reader: schema validation, drift detection. +2. **Integration (Drizzle, local Postgres)** + - Seed 100k-row `users` table with plaintext `email`. + - `stash db install` → EQL + `cs_migrations` installed. + - `stash encrypt advance --to dual-writing --table users --column email` → records event. + - Manually wire dual-write in the test app's insert code (simulates user + agent handoff). + - `stash encrypt backfill --table users --column email` → completes; progress output sane; `COUNT(*) WHERE email_encrypted IS NULL` = 0. + - Kill mid-backfill (SIGINT) → re-run with `--resume` → completes without duplicate encryption; `cs_migrations` shows continuous cursor progression. + - `stash encrypt status` → shows `backfilled`. + - `stash encrypt cutover` → rename executes; app (still running, reads `email`) now gets decrypted ciphertext transparently. + - `stash encrypt drop` → migration file emitted; apply; `email_plaintext` gone. +3. **Idempotency** + - Run `backfill` twice with no kill — second run does 0 writes. + - Concurrent runners on two shells — both converge, no duplicate writes, no missed rows. +4. **Proxy interop** + - After cutover, connect via Proxy and `SELECT email FROM users` → returns plaintext (Proxy decrypted). + - Connect directly to Postgres and `SELECT email FROM users` → returns encrypted JSON payload. +5. **Failure paths** + - Inject a row with invalid UTF-8 → `error` event recorded with cursor; `--continue-on-error` skips; default halts. + - Kill DB mid-chunk → transaction rollback; retry succeeds. +6. **Status accuracy** + - Manually drop a `_encrypted` column → `status` flags drift (EQL says registered, physical column absent). + - Manually set `eql_v2_configuration` to `pending` with an unready column → `status` surfaces `ready_for_encryption = false`. +7. **Large-data smoke** + - 10M-row table backfill on a dev DB; measure wall-clock, memory, DB load. Confirm no OOM, no unbounded RAM (chunk buffer drains each loop). + +## Phase 2+ (not in this plan) + +- **Proxy-mode backfill** — `UPDATE ... FROM (SELECT id, col FROM t WHERE ...)` routed through a Proxy connection; Proxy encrypts on the fly. Same `cs_migrations` state, same cursor model. +- **Upstream `cs_migrations` into EQL** as `eql_v2_migrations` so Proxy can read/write it directly. Requires EQL release + coordinated CLI bump. +- **`FOR UPDATE SKIP LOCKED`** variant for true multi-worker parallelism. +- **CipherStash-hosted backfill runner** (push a backfill job; we run it). +- **`stash encrypt reverse`** — emergency rollback: rename-swap back, re-enable plaintext reads. Controversial; needs separate design. +- **Drizzle/Prisma/Generic dual-write helpers** in `@cipherstash/stack` — e.g. `dualWrite(email, email_encrypted)` wrapper that agents can drop in with minimal surface area. +- **Non-PK ordering** for tables without a sortable primary key (hash-partitioned CTIDs, etc.). + +## Open items flagged (decisions already made) + +- Phase 1 runtime mode = Protect/Stack client-side only. +- Phase 4 default cutover mechanism = `eql_v2.rename_encrypted_columns()` (transparent to app code). +- State store = repo manifest + `eql_v2_configuration` (EQL intent) + new `cs_migrations` table (runtime state). +- Phase 1 shipping scope = status + backfill first-class; other phases as thin wrappers. +- `cs_migrations` is CLI-owned for now, explicitly designed to be upstreamed into EQL as `eql_v2_migrations` in a later release so both Stack and Proxy own it jointly. diff --git a/packages/cli/package.json b/packages/cli/package.json index e722470d..7df1eba7 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -42,6 +42,7 @@ "dependencies": { "@anthropic-ai/claude-agent-sdk": "^0.2.87", "@cipherstash/auth": "catalog:repo", + "@cipherstash/migrate": "workspace:*", "@clack/prompts": "0.10.1", "dotenv": "16.4.7", "jiti": "2.6.1", diff --git a/packages/cli/scripts/e2e-encrypt.sh b/packages/cli/scripts/e2e-encrypt.sh new file mode 100755 index 00000000..99f52f68 --- /dev/null +++ b/packages/cli/scripts/e2e-encrypt.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +# End-to-end smoke test for `stash encrypt`. +# +# Requires a local Postgres you have superuser on. Creates & destroys +# `stash_e2e_test`. Requires CipherStash credentials in the environment +# for the actual encryption step (CS_CLIENT_ACCESS_KEY etc). +# +# Usage: bash packages/cli/scripts/e2e-encrypt.sh + +set -euo pipefail + +DB=${STASH_E2E_DB:-stash_e2e_test} +HOST=${STASH_E2E_HOST:-localhost} +DATABASE_URL="postgres://${USER}@${HOST}/${DB}" +STASH="$(cd "$(dirname "$0")/../dist/bin" && pwd)/stash.js" +FIXTURES="$(cd "$(dirname "$0")/fixtures" && pwd)" + +if [ ! -x "$STASH" ]; then + echo "CLI not built. Run: pnpm --filter @cipherstash/cli build" >&2 + exit 1 +fi + +psql -h "$HOST" -d postgres -c "DROP DATABASE IF EXISTS ${DB}" >/dev/null +psql -h "$HOST" -d postgres -c "CREATE DATABASE ${DB}" >/dev/null + +export DATABASE_URL + +echo "==> 1. Install EQL + cs_migrations" +"$STASH" db install --force + +echo "==> 2. Seed 5000 plaintext users" +psql "$DATABASE_URL" -f "$FIXTURES/seed-users.sql" >/dev/null +psql "$DATABASE_URL" -c "ALTER TABLE users ADD COLUMN email_encrypted eql_v2_encrypted" >/dev/null + +echo "==> 3. Record dual-writing" +"$STASH" encrypt advance --to dual-writing --table users --column email + +echo "==> 4. Backfill with interrupt/resume" +"$STASH" encrypt backfill --table users --column email --chunk-size 500 & +PID=$! +sleep 2 +kill -INT "$PID" || true +wait "$PID" || true +"$STASH" encrypt backfill --table users --column email + +REMAINING=$(psql "$DATABASE_URL" -At -c "SELECT count(*) FROM users WHERE email_encrypted IS NULL") +if [ "$REMAINING" != "0" ]; then + echo "FAIL: ${REMAINING} rows still unencrypted" >&2 + exit 1 +fi +echo "OK: all 5000 rows encrypted" + +echo "==> 5. Status" +"$STASH" encrypt status + +echo "==> 6. Cutover" +"$STASH" encrypt cutover --table users --column email + +echo "==> 7. Drop" +"$STASH" encrypt drop --table users --column email --migrations-dir "$(pwd)/drizzle" + +echo "==> Done." diff --git a/packages/cli/scripts/fixtures/seed-users.sql b/packages/cli/scripts/fixtures/seed-users.sql new file mode 100644 index 00000000..1b114e03 --- /dev/null +++ b/packages/cli/scripts/fixtures/seed-users.sql @@ -0,0 +1,16 @@ +-- Seed a users table with plaintext emails for e2e backfill testing. +-- +-- The encrypted target column must be created separately (drizzle-kit / +-- stash db push route), after which the backfill encrypts `email` → `email_encrypted`. + +DROP TABLE IF EXISTS users CASCADE; + +CREATE TABLE users ( + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + email text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now() +); + +INSERT INTO users (email) +SELECT 'user-' || g || '@example.com' +FROM generate_series(1, 5000) AS g; diff --git a/packages/cli/src/bin/stash.ts b/packages/cli/src/bin/stash.ts index 6212d79e..c16c5dcf 100644 --- a/packages/cli/src/bin/stash.ts +++ b/packages/cli/src/bin/stash.ts @@ -73,6 +73,13 @@ Commands: schema build Build an encryption schema from your database + encrypt status Show per-column migration status (phase, progress, drift) + encrypt plan Diff intent (.cipherstash/migrations.json) vs observed state + encrypt advance Record a phase transition for a column + encrypt backfill Resumably encrypt plaintext into the encrypted column + encrypt cutover Rename swap encrypted → primary column + encrypt drop Generate a migration to drop the plaintext column + env (experimental) Print production env vars for deployment Options: @@ -198,6 +205,99 @@ async function runDbCommand( } } +async function runEncryptCommand( + sub: string | undefined, + flags: Record, + values: Record, +) { + switch (sub) { + case 'status': { + const { statusCommand } = await requireStack( + () => import('../commands/encrypt/status.js'), + ) + await statusCommand() + break + } + case 'plan': { + const { planCommand } = await requireStack( + () => import('../commands/encrypt/plan.js'), + ) + await planCommand() + break + } + case 'advance': { + const table = requireValue(values, 'table') + const column = requireValue(values, 'column') + const to = requireValue(values, 'to') as + | 'schema-added' + | 'dual-writing' + | 'backfilling' + | 'backfilled' + | 'cut-over' + | 'dropped' + const { advanceCommand } = await requireStack( + () => import('../commands/encrypt/advance.js'), + ) + await advanceCommand({ table, column, to, note: values.note }) + break + } + case 'backfill': { + const table = requireValue(values, 'table') + const column = requireValue(values, 'column') + const { backfillCommand } = await requireStack( + () => import('../commands/encrypt/backfill.js'), + ) + await backfillCommand({ + table, + column, + pkColumn: values['pk-column'], + chunkSize: values['chunk-size'] + ? Number(values['chunk-size']) + : undefined, + encryptedColumn: values['encrypted-column'], + schemaColumnKey: values['schema-column-key'], + }) + break + } + case 'cutover': { + const table = requireValue(values, 'table') + const column = requireValue(values, 'column') + const { cutoverCommand } = await requireStack( + () => import('../commands/encrypt/cutover.js'), + ) + await cutoverCommand({ table, column, proxyUrl: values['proxy-url'] }) + break + } + case 'drop': { + const table = requireValue(values, 'table') + const column = requireValue(values, 'column') + const { dropCommand } = await requireStack( + () => import('../commands/encrypt/drop.js'), + ) + await dropCommand({ + table, + column, + migrationsDir: values['migrations-dir'], + }) + break + } + default: + p.log.error(`Unknown encrypt subcommand: ${sub ?? '(none)'}`) + console.log() + console.log(HELP) + process.exit(1) + } +} + +function requireValue(values: Record, key: string): string { + const v = values[key] + if (!v) { + p.log.error(`Missing required --${key} value.`) + process.exit(1) + } + return v +} + async function runSchemaCommand( sub: string | undefined, flags: Record, @@ -255,6 +355,9 @@ async function main() { case 'db': await runDbCommand(subcommand, flags, values) break + case 'encrypt': + await runEncryptCommand(subcommand, flags, values) + break case 'schema': await runSchemaCommand(subcommand, flags) break diff --git a/packages/cli/src/commands/db/install.ts b/packages/cli/src/commands/db/install.ts index a1cbd9ab..d1dff1d6 100644 --- a/packages/cli/src/commands/db/install.ts +++ b/packages/cli/src/commands/db/install.ts @@ -8,7 +8,12 @@ import { downloadEqlSql, loadBundledEqlSql, } from '@/installer/index.js' +import { + MIGRATIONS_SCHEMA_SQL, + installMigrationsSchema, +} from '@cipherstash/migrate' import * as p from '@clack/prompts' +import pg from 'pg' import { ensureStashConfig } from './config-scaffold.js' import { detectDrizzle, detectSupabase } from './detect.js' import { rewriteEncryptedAlterColumns } from './rewrite-migrations.js' @@ -137,6 +142,23 @@ export async function installCommand(options: InstallOptions) { p.log.success('Supabase role permissions granted.') } + s.start('Installing cs_migrations tracking schema...') + const migrationsDb = new pg.Client({ connectionString: config.databaseUrl }) + try { + await migrationsDb.connect() + await installMigrationsSchema(migrationsDb) + s.stop('cs_migrations schema installed.') + } catch (err) { + s.stop('Failed to install cs_migrations schema.') + p.log.warn( + err instanceof Error + ? err.message + : 'Encryption migration tracking is unavailable; `stash encrypt` commands will fail until this is resolved.', + ) + } finally { + await migrationsDb.end() + } + printNextSteps() p.outro('Done!') } @@ -315,11 +337,16 @@ async function generateDrizzleMigration( } } - // Step 4: Write the EQL SQL into the migration file + // Step 4: Write the EQL SQL (and cs_migrations tracking schema) into + // the migration file. Bundling both means `drizzle-kit migrate` rolls + // everything needed for `stash encrypt ...` out to each environment + // in one go, rather than requiring an out-of-band `stash db install`. s.start('Writing EQL SQL into migration file...') + const migrationContents = `${eqlSql}\n\n-- CipherStash encryption-migration tracking schema.\n-- Tracks per-column phase + backfill progress for \`stash encrypt\`.\n${MIGRATIONS_SCHEMA_SQL.trim()}\n` + try { - writeFileSync(generatedMigrationPath, eqlSql, 'utf-8') + writeFileSync(generatedMigrationPath, migrationContents, 'utf-8') s.stop('EQL SQL written to migration file.') } catch (error) { s.stop('Failed to write migration file.') diff --git a/packages/cli/src/commands/encrypt/advance.ts b/packages/cli/src/commands/encrypt/advance.ts new file mode 100644 index 00000000..934fdc1c --- /dev/null +++ b/packages/cli/src/commands/encrypt/advance.ts @@ -0,0 +1,93 @@ +import { loadStashConfig } from '@/config/index.js' +import { type MigrationPhase, appendEvent } from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' + +/** + * Map a user-declared target phase to the event name we write to + * `cs_migrations`. `backfilling` is recorded as `backfill_started`; the + * phase itself is set to `backfilling` regardless. + */ +const PHASE_TO_EVENT: Record< + MigrationPhase, + | 'schema_added' + | 'dual_writing' + | 'backfill_started' + | 'backfilled' + | 'cut_over' + | 'dropped' +> = { + 'schema-added': 'schema_added', + 'dual-writing': 'dual_writing', + backfilling: 'backfill_started', + backfilled: 'backfilled', + 'cut-over': 'cut_over', + dropped: 'dropped', +} + +/** + * Options accepted by `stash encrypt advance`. Used to *declare* that a + * column has reached a new phase — especially useful for `dual-writing`, + * which is an app-code property that the CLI cannot detect automatically. + */ +export interface AdvanceCommandOptions { + /** Physical table name, e.g. `users`. Supports `schema.table`. */ + table: string + /** Physical plaintext column, e.g. `email`. */ + column: string + /** + * The phase the column is transitioning *to*. Records a corresponding + * event (see {@link PHASE_TO_EVENT}). Does not enforce an order — you + * can move backwards if needed, e.g. to re-run a backfill. + */ + to: MigrationPhase + /** + * Optional free-form note, stored in the event's `details.note`. Useful + * for capturing why a phase transition is happening ("deploy 1.23 + * introduced dual-write") so it shows up in audit queries later. + */ + note?: string +} + +/** + * CLI handler for `stash encrypt advance`. Appends a phase-transition event + * to `cs_migrations`. When advancing to `dual-writing`, also prints a + * reminder about the required persistence-layer code change. + */ +export async function advanceCommand(options: AdvanceCommandOptions) { + p.intro('npx @cipherstash/cli encrypt advance') + + const config = await loadStashConfig() + const client = new pg.Client({ connectionString: config.databaseUrl }) + + try { + await client.connect() + await appendEvent(client, { + tableName: options.table, + columnName: options.column, + event: PHASE_TO_EVENT[options.to], + phase: options.to, + details: options.note ? { note: options.note } : null, + }) + + p.log.success( + `${options.table}.${options.column} is now recorded as '${options.to}'.`, + ) + + if (options.to === 'dual-writing') { + p.note( + `Update your persistence layer to write this value to both columns:\n - ${options.column} (plaintext, existing)\n - ${options.column}_encrypted (ciphertext, via your encryption client)\n\nThen run: stash encrypt backfill --table ${options.table} --column ${options.column}`, + 'Next', + ) + } + + p.outro('Recorded.') + } catch (error) { + p.log.error( + error instanceof Error ? error.message : 'Failed to record transition.', + ) + process.exit(1) + } finally { + await client.end() + } +} diff --git a/packages/cli/src/commands/encrypt/backfill.ts b/packages/cli/src/commands/encrypt/backfill.ts new file mode 100644 index 00000000..f4864182 --- /dev/null +++ b/packages/cli/src/commands/encrypt/backfill.ts @@ -0,0 +1,290 @@ +import { loadStashConfig } from '@/config/index.js' +import { runBackfill } from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' +import { loadEncryptionContext, requireTable } from './context.js' + +/** + * Options accepted by `stash encrypt backfill`. Each field maps 1:1 to a + * CLI flag of the same name (kebab-case at the CLI boundary). + */ +export interface BackfillCommandOptions { + /** + * Physical table name, e.g. `users`. Supports schema-qualified form + * (`public.users`); identifiers are quoted before being put into SQL. + * Must also exist as an exported `EncryptedTable` in the user's + * encryption client file (`src/encryption/index.ts`) — the command + * errors early if the schema is missing. + */ + table: string + /** + * Physical plaintext column to encrypt, e.g. `email`. The command reads + * from this column, encrypts client-side, and writes the ciphertext + * into {@link encryptedColumn}. Rows where this is `NULL` are skipped. + */ + column: string + /** + * Override auto-detection of the primary-key column. The command + * otherwise queries `information_schema` for the table's single-column + * PK. Required when the table has a composite PK (composite support is + * deferred; pick one column that is unique and comparable). + */ + pkColumn?: string + /** + * Rows per chunk/transaction. Default `1000`. Lower for lock-sensitive + * tables or very wide rows; higher for tables with tiny encrypted + * payloads. Also bounds the most work lost to a `Ctrl-C` mid-chunk. + */ + chunkSize?: number + /** + * Physical destination column for the ciphertext, e.g. `email_encrypted`. + * Defaults to `_encrypted` to match the convention produced by + * `drizzle-kit` + CipherStash's migration rewriter. Override only if + * your schema uses a non-standard column name. + */ + encryptedColumn?: string + /** + * Key in the `EncryptedTable` schema object that corresponds to this + * column. Defaults to `column`. Override when your schema uses a + * different key than the physical column — for example: + * + * ```ts + * // src/encryption/index.ts + * export const usersTable = encryptedTable('users', { + * emailAddress: encryptedColumn('email').equality(), + * // ^^^^^^^^^^^^ schema key ^^^^^ physical column + * }) + * ``` + * + * would need `--schema-column-key emailAddress --column email`. + */ + schemaColumnKey?: string +} + +/** + * CLI handler for `stash encrypt backfill`. Loads the user's encryption + * client via jiti, opens a pg pool, wires `SIGINT`/`SIGTERM` to a clean + * shutdown, and delegates to {@link runBackfill}. Exits with code `1` on + * any unrecoverable error. + * + * Safe to re-run: backfill is idempotent (guards with `encrypted IS NULL`) + * and resumes from the last committed checkpoint. + */ +export async function backfillCommand(options: BackfillCommandOptions) { + p.intro('npx @cipherstash/cli encrypt backfill') + + const stashConfig = await loadStashConfig() + const ctx = await loadEncryptionContext() + const tableSchema = requireTable(ctx, options.table) + + const pool = new pg.Pool({ + connectionString: stashConfig.databaseUrl, + max: 2, + }) + + const controller = new AbortController() + const onSignal = () => { + p.log.warn('Interrupt received; finishing current chunk and exiting.') + controller.abort() + } + process.on('SIGINT', onSignal) + process.on('SIGTERM', onSignal) + + const db = await pool.connect() + try { + const pkColumn = + options.pkColumn ?? (await detectPkColumn(db, options.table)) + + const plaintextColumn = options.column + const encryptedColumn = + options.encryptedColumn ?? `${options.column}_encrypted` + // The EncryptedTable schema is keyed by the *encrypted* column name + // (because the user's ORM schema declares the encrypted column, not + // the plaintext one). Default accordingly — override with + // --schema-column-key only if your schema uses a different object key. + const schemaColumnKey = options.schemaColumnKey ?? encryptedColumn + + const { transform: transformPlaintext, castAs: detectedCastAs } = + buildPlaintextCoercer(tableSchema, schemaColumnKey) + + // protect-ffi's JsPlaintext wire enum currently has 4 variants: + // String / Number / Boolean / JsonB. Date and Timestamp columns are + // typed on the Rust side (NaiveDate / DateTime) but there is no + // JS-visible wire format for them, so any JS Date is serialised to + // an ISO string by napi-rs and the Rust side then refuses it because + // string values only bind to Utf8Str columns. Warn before wasting + // time running a backfill that will fail on the first chunk. + if (detectedCastAs === 'date' || detectedCastAs === 'timestamp') { + p.log.warn( + `Column ${options.table}.${encryptedColumn} declares cast_as: '${detectedCastAs}', which protect-ffi does not currently support for encryption. The backfill will fail with "Cannot convert String to Date". Consider changing the schema to dataType: 'string' (or omitting dataType) and storing ISO date strings instead, then re-running \`stash db push\`.`, + ) + const proceed = await p.confirm({ + message: 'Continue anyway?', + initialValue: false, + }) + if (p.isCancel(proceed) || !proceed) { + p.outro('Aborted.') + return + } + } + + p.log.info( + `Backfilling ${options.table}.${plaintextColumn} → ${encryptedColumn} (pk: ${pkColumn}, chunk: ${options.chunkSize ?? 1000}, schema cast_as: ${detectedCastAs ?? '(unknown, passing through)'}).`, + ) + + let lastLogged = 0 + const result = await runBackfill({ + db, + encryptionClient: ctx.client as unknown as Parameters< + typeof runBackfill + >[0]['encryptionClient'], + tableSchema, + tableName: options.table, + schemaColumnKey, + plaintextColumn, + encryptedColumn, + pkColumn, + chunkSize: options.chunkSize, + signal: controller.signal, + transformPlaintext, + onProgress: (progress) => { + if ( + progress.rowsProcessed - lastLogged >= 5000 || + progress.rowsProcessed === progress.rowsTotal + ) { + p.log.step( + `${progress.rowsProcessed.toLocaleString()}/${progress.rowsTotal.toLocaleString()} rows`, + ) + lastLogged = progress.rowsProcessed + } + }, + }) + + if (!result.completed) { + p.log.warn( + `Stopped before completion. ${result.rowsProcessed.toLocaleString()} rows processed. Re-run to resume.`, + ) + p.outro('Paused.') + return + } + + p.outro( + `Backfill complete. ${result.rowsProcessed.toLocaleString()} rows encrypted.`, + ) + } catch (error) { + p.log.error(error instanceof Error ? error.message : 'Backfill failed.') + process.exit(1) + } finally { + process.off('SIGINT', onSignal) + process.off('SIGTERM', onSignal) + db.release() + await pool.end() + } +} + +/** + * Build a coercer that turns whatever the `pg` driver returns for a given + * column into the JS shape `bulkEncryptModels` expects, based on the + * schema's declared `cast_as`. Fixes the common "pg returns numeric as + * string, Protect wants a JS number" mismatch without forcing the user + * to set a global pg type parser. + * + * - `'number'` / `'double'` / `'real'` / `'int'` etc. → `Number(string)` + * - `'bigint'` / `'big_int'` → `BigInt(string)` + * - `'date'` → `new Date(string)` if pg returned a string + * - `'boolean'` → `"true"`/`"false"` strings coerced to bool + * - `'string'` / `'text'` / `'json'` / `'jsonb'` → identity (pg already fits) + * + * Null / undefined are always passed through unchanged. + */ +function buildPlaintextCoercer( + // biome-ignore lint/suspicious/noExplicitAny: EncryptedTableLike.build is generic + tableSchema: { build(): { columns: Record } }, + schemaColumnKey: string, +): { transform: (value: unknown) => unknown; castAs: string | undefined } { + let castAs: string | undefined + try { + castAs = tableSchema.build().columns?.[schemaColumnKey]?.cast_as + } catch { + castAs = undefined + } + + const transform = (() => { + switch (castAs) { + case 'number': + case 'double': + case 'real': + case 'float': + case 'decimal': + case 'int': + case 'small_int': + return (v) => { + if (v === null || v === undefined) return v + return typeof v === 'string' ? Number(v) : v + } + case 'bigint': + case 'big_int': + return (v) => { + if (v === null || v === undefined) return v + if (typeof v === 'bigint') return v + if (typeof v === 'number' || typeof v === 'string') return BigInt(v) + return v + } + case 'date': + case 'timestamp': + return (v) => { + if (v === null || v === undefined) return v + if (v instanceof Date) return v + if (typeof v === 'string' || typeof v === 'number') return new Date(v) + return v + } + case 'boolean': + return (v) => { + if (v === null || v === undefined) return v + if (typeof v === 'boolean') return v + if (typeof v === 'string') return v === 'true' || v === 't' + return v + } + default: + // 'string', 'text', 'json', 'jsonb', or unknown — pg already returns + // the right JS type for these. + return (v: unknown) => v + } + })() + + return { transform, castAs } +} + +async function detectPkColumn( + db: pg.PoolClient, + tableName: string, +): Promise { + const [schema, table] = tableName.includes('.') + ? tableName.split('.') + : [null, tableName] + + const result = await db.query<{ column_name: string }>( + `SELECT a.attname AS column_name + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + JOIN pg_class c ON c.oid = i.indrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE i.indisprimary + AND c.relname = $1 + AND ($2::text IS NULL OR n.nspname = $2) + ORDER BY a.attnum ASC`, + [table, schema], + ) + + if (result.rows.length === 0) { + throw new Error( + `Could not detect a primary key on ${tableName}. Pass --pk-column .`, + ) + } + if (result.rows.length > 1) { + throw new Error( + `${tableName} has a composite primary key; composite keys are not yet supported. Pass --pk-column to override.`, + ) + } + return result.rows[0]?.column_name +} diff --git a/packages/cli/src/commands/encrypt/context.ts b/packages/cli/src/commands/encrypt/context.ts new file mode 100644 index 00000000..4c25817b --- /dev/null +++ b/packages/cli/src/commands/encrypt/context.ts @@ -0,0 +1,164 @@ +import fs from 'node:fs' +import path from 'node:path' +import { type ResolvedStashConfig, loadStashConfig } from '@/config/index.js' +import type { EncryptionClient } from '@cipherstash/stack/encryption' + +/** + * Structural shape of `@cipherstash/stack`'s `EncryptedTable` class. + * Duck-typed so we don't need to `instanceof` across module boundaries + * (which is fragile with dual CJS/ESM). + */ +export interface EncryptedTableLike { + readonly tableName: string + build(): { tableName: string; columns: Record } +} + +/** + * Everything the encrypt commands need to do real work: resolved stash + * config, the user's initialised encryption client, and a table-name-keyed + * map of every `EncryptedTable` exported from the client file. + */ +export interface EncryptionContext { + stashConfig: ResolvedStashConfig + client: EncryptionClient + tables: Map +} + +/** + * Load `stash.config.ts`, dynamic-import the user's encryption client file + * via jiti, and harvest: + * + * 1. The initialised `EncryptionClient` — detected by duck-typing any + * export that exposes a `getEncryptConfig()` method. + * 2. Every `EncryptedTable` — detected by the pair of `tableName: string` + * and `build(): …` properties. Keyed by `tableName`. + * + * Both are needed by the backfill runner: the client to call + * `bulkEncryptModels`, and the table schema to pass as the second arg. + * + * Exits the process with code `1` on any load error — the same hard-fail + * behaviour `loadStashConfig` / `loadEncryptConfig` already use elsewhere + * in the CLI. + */ +export async function loadEncryptionContext(): Promise { + const stashConfig = await loadStashConfig() + const resolvedPath = path.resolve(process.cwd(), stashConfig.client) + + if (!fs.existsSync(resolvedPath)) { + console.error( + `Error: Encrypt client file not found at ${resolvedPath}\n\nCheck the "client" path in your stash.config.ts.`, + ) + process.exit(1) + } + + const { createJiti } = await import('jiti') + const jiti = createJiti(resolvedPath, { interopDefault: true }) + + let moduleExports: Record + try { + moduleExports = (await jiti.import(resolvedPath)) as Record + } catch (error) { + console.error( + `Error: Failed to load encrypt client file at ${resolvedPath}\n`, + ) + console.error(error) + process.exit(1) + } + + let client: EncryptionClient | undefined + const tables = new Map() + const drizzleCandidates: unknown[] = [] + + const DRIZZLE_NAME_SYMBOL = Symbol.for('drizzle:Name') + + for (const value of Object.values(moduleExports)) { + if (!value || typeof value !== 'object') continue + + if ( + 'getEncryptConfig' in value && + typeof (value as { getEncryptConfig?: unknown }).getEncryptConfig === + 'function' + ) { + client = value as EncryptionClient + continue + } + + if ( + 'tableName' in value && + typeof (value as { tableName?: unknown }).tableName === 'string' && + 'build' in value && + typeof (value as { build?: unknown }).build === 'function' + ) { + const table = value as EncryptedTableLike + tables.set(table.tableName, table) + continue + } + + // Drizzle pgTable — Symbol.for('drizzle:Name') is set by drizzle-orm + // on anything constructed via `pgTable()`. We'll run extractEncryptionSchema + // on these in a second pass. + if ((value as Record)[DRIZZLE_NAME_SYMBOL] !== undefined) { + drizzleCandidates.push(value) + } + } + + // Second pass: auto-derive EncryptedTable schemas from drizzle pgTable + // exports so users don't have to manually export the result of + // extractEncryptionSchema(). Silently no-op if @cipherstash/stack/drizzle + // isn't installed (e.g. a Supabase-only project). + if (drizzleCandidates.length > 0) { + try { + const drizzleModule = (await import('@cipherstash/stack/drizzle')) as { + extractEncryptionSchema?: (t: unknown) => EncryptedTableLike + } + const extract = drizzleModule.extractEncryptionSchema + if (extract) { + for (const candidate of drizzleCandidates) { + try { + const derived = extract(candidate) + if (derived?.tableName && !tables.has(derived.tableName)) { + tables.set(derived.tableName, derived) + } + } catch { + // Table has no encrypted columns, or extraction failed for + // another reason. Ignore — not every drizzle table is a + // backfill target. + } + } + } + } catch { + // @cipherstash/stack/drizzle not installed; skip drizzle fallback. + } + } + + if (!client) { + console.error( + `Error: No EncryptionClient export found in ${stashConfig.client}.`, + ) + process.exit(1) + } + + return { stashConfig, client, tables } +} + +/** + * Look up the `EncryptedTable` for the given table name in the loaded + * context. Exits the process with code `1` if the table is not declared + * in the user's encryption client file — without this schema, backfill + * cannot call `bulkEncryptModels`. + */ +export function requireTable( + ctx: EncryptionContext, + tableName: string, +): EncryptedTableLike { + const table = ctx.tables.get(tableName) + if (!table) { + const available = Array.from(ctx.tables.keys()).join(', ') || '(none)' + console.error( + `Error: Table "${tableName}" was not found in the encryption client exports.\n` + + `Available: ${available}`, + ) + process.exit(1) + } + return table +} diff --git a/packages/cli/src/commands/encrypt/cutover.ts b/packages/cli/src/commands/encrypt/cutover.ts new file mode 100644 index 00000000..d484d75b --- /dev/null +++ b/packages/cli/src/commands/encrypt/cutover.ts @@ -0,0 +1,106 @@ +import { loadStashConfig } from '@/config/index.js' +import { + appendEvent, + progress, + reloadConfig, + renameEncryptedColumns, +} from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' + +/** + * Options accepted by `stash encrypt cutover`. Swaps the plaintext and + * encrypted columns via `eql_v2.rename_encrypted_columns()` so that apps + * reading `` transparently receive the encrypted column + * (decrypted on read by Proxy or client-side by Stack). + */ +export interface CutoverCommandOptions { + /** Physical table name, e.g. `users`. Supports `schema.table`. */ + table: string + /** + * Physical plaintext column that is being cut over, e.g. `email`. Used + * only for the state-transition check and event log; the actual rename + * affects every column in the active EQL config in a single call. + */ + column: string + /** + * Optional Postgres URL of a CipherStash Proxy. When set, the command + * connects to the Proxy after the rename and runs `eql_v2.reload_config()` + * so Proxy picks up the renamed columns immediately rather than waiting + * for its 60-second refresh. When unset, prints a warning to that effect + * and returns — the Proxy will refresh on its own. + * + * Also readable from `CIPHERSTASH_PROXY_URL` in the environment. + */ + proxyUrl?: string +} + +/** + * CLI handler for `stash encrypt cutover`. Verifies the target column is + * in phase `backfilled`, runs `eql_v2.rename_encrypted_columns()` inside + * a transaction, appends a `cut_over` event, and optionally triggers a + * Proxy config reload. Exits with code `1` if preconditions are not met. + */ +export async function cutoverCommand(options: CutoverCommandOptions) { + p.intro('npx @cipherstash/cli encrypt cutover') + + const config = await loadStashConfig() + const client = new pg.Client({ connectionString: config.databaseUrl }) + + try { + await client.connect() + + const state = await progress(client, options.table, options.column) + if (state?.phase !== 'backfilled') { + p.log.error( + `Cannot cut over: ${options.table}.${options.column} is in phase '${state?.phase ?? '—'}'. Must be 'backfilled'.`, + ) + process.exit(1) + } + + await client.query('BEGIN') + try { + await renameEncryptedColumns(client) + await appendEvent(client, { + tableName: options.table, + columnName: options.column, + event: 'cut_over', + phase: 'cut-over', + details: { renamed: true }, + }) + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } + + p.log.success( + `Renamed ${options.column} → ${options.column}_plaintext and ${options.column}_encrypted → ${options.column}.`, + ) + + const proxyUrl = options.proxyUrl ?? process.env.CIPHERSTASH_PROXY_URL + if (proxyUrl) { + const proxy = new pg.Client({ connectionString: proxyUrl }) + try { + await proxy.connect() + await reloadConfig(proxy) + p.log.success('Proxy config reloaded.') + } finally { + await proxy.end() + } + } else { + p.log.warn( + 'CIPHERSTASH_PROXY_URL not set; Proxy users must wait up to 60s for config refresh.', + ) + } + + p.outro( + 'Cut-over complete. Your app reads the encrypted column transparently.', + ) + } catch (error) { + p.log.error(error instanceof Error ? error.message : 'Cut-over failed.') + process.exit(1) + } finally { + await client.end() + } +} diff --git a/packages/cli/src/commands/encrypt/drop.ts b/packages/cli/src/commands/encrypt/drop.ts new file mode 100644 index 00000000..7ef50deb --- /dev/null +++ b/packages/cli/src/commands/encrypt/drop.ts @@ -0,0 +1,91 @@ +import fs from 'node:fs' +import path from 'node:path' +import { loadStashConfig } from '@/config/index.js' +import { appendEvent, progress } from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' + +/** + * Options accepted by `stash encrypt drop`. Generates a migration file + * that drops the now-unused plaintext column (renamed to `_plaintext` + * by cutover). Does *not* apply the migration — the user runs their usual + * migration tool (drizzle-kit, prisma, psql) to actually execute it. + */ +export interface DropCommandOptions { + /** Physical table name, e.g. `users`. */ + table: string + /** + * Physical column — the original plaintext name. The generated migration + * drops `_plaintext` (the name the column has *after* cutover). + */ + column: string + /** + * Directory to write the generated `.sql` migration into, relative to + * the current working directory. Default: `./drizzle`. Use `./migrations` + * (or similar) for Prisma / manual psql workflows. + */ + migrationsDir?: string +} + +/** + * CLI handler for `stash encrypt drop`. Requires the column to be in + * phase `cut-over`; otherwise errors out. Writes a timestamped + * `ALTER TABLE … DROP COLUMN _plaintext` statement, appends a + * `dropped` event, and prints instructions for applying the migration. + */ +export async function dropCommand(options: DropCommandOptions) { + p.intro('npx @cipherstash/cli encrypt drop') + + const config = await loadStashConfig() + const client = new pg.Client({ connectionString: config.databaseUrl }) + + try { + await client.connect() + + const state = await progress(client, options.table, options.column) + if (state?.phase !== 'cut-over') { + p.log.error( + `Cannot generate drop migration: ${options.table}.${options.column} is in phase '${state?.phase ?? '—'}'. Must be 'cut-over'.`, + ) + process.exit(1) + } + + const migrationsDir = path.resolve( + process.cwd(), + options.migrationsDir ?? 'drizzle', + ) + fs.mkdirSync(migrationsDir, { recursive: true }) + + const ts = new Date() + .toISOString() + .replace(/[-:.TZ]/g, '') + .slice(0, 14) + const fileName = `${ts}_drop_${options.table}_${options.column}_plaintext.sql` + const filePath = path.join(migrationsDir, fileName) + + const sql = `-- Generated by @cipherstash/cli encrypt drop\n-- Drops the plaintext column now that ${options.table}.${options.column} is encrypted.\n\nALTER TABLE "${options.table}" DROP COLUMN "${options.column}_plaintext";\n` + fs.writeFileSync(filePath, sql, 'utf-8') + + await appendEvent(client, { + tableName: options.table, + columnName: options.column, + event: 'dropped', + phase: 'dropped', + details: { migrationFile: filePath }, + }) + + p.log.success(`Migration written to ${filePath}`) + p.note( + `Review the migration, then apply it with your migration tool:\n - drizzle-kit migrate\n - prisma migrate deploy\n - psql -f ${fileName}`, + 'Next', + ) + p.outro('Drop migration generated.') + } catch (error) { + p.log.error( + error instanceof Error ? error.message : 'Drop generation failed.', + ) + process.exit(1) + } finally { + await client.end() + } +} diff --git a/packages/cli/src/commands/encrypt/plan.ts b/packages/cli/src/commands/encrypt/plan.ts new file mode 100644 index 00000000..c4ac2938 --- /dev/null +++ b/packages/cli/src/commands/encrypt/plan.ts @@ -0,0 +1,58 @@ +import { loadStashConfig } from '@/config/index.js' +import { latestByColumn, readManifest } from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' + +/** + * CLI handler for `stash encrypt plan`. Reads the repo manifest and the + * latest `cs_migrations` state for each declared column, and prints the + * transitions needed to reach each column's `targetPhase`. + * + * No state changes are made. This is the "what would happen if I + * `advance`d everything" preview. + */ +export async function planCommand() { + p.intro('npx @cipherstash/cli encrypt plan') + + const config = await loadStashConfig() + const manifest = await readManifest(process.cwd()) + + if (!manifest) { + p.log.warn('No .cipherstash/migrations.json found.') + p.outro('Nothing to plan.') + return + } + + const client = new pg.Client({ connectionString: config.databaseUrl }) + try { + await client.connect() + const state = await latestByColumn(client) + + const actions: string[] = [] + for (const [tableName, columns] of Object.entries(manifest.tables)) { + for (const column of columns) { + const key = `${tableName}.${column.column}` + const current = state.get(key) + if (!current) { + actions.push( + ` + ${key}: no migration recorded; start with \`stash encrypt advance --to schema-added\``, + ) + continue + } + if (current.phase !== column.targetPhase) { + actions.push(` → ${key}: ${current.phase} → ${column.targetPhase}`) + } else { + actions.push(` ✓ ${key}: already at ${column.targetPhase}`) + } + } + } + + p.note(actions.join('\n') || '(no changes)', 'Plan') + p.outro('Done.') + } catch (error) { + p.log.error(error instanceof Error ? error.message : 'Plan failed.') + process.exit(1) + } finally { + await client.end() + } +} diff --git a/packages/cli/src/commands/encrypt/status.ts b/packages/cli/src/commands/encrypt/status.ts new file mode 100644 index 00000000..0a5d50ba --- /dev/null +++ b/packages/cli/src/commands/encrypt/status.ts @@ -0,0 +1,269 @@ +import { loadStashConfig } from '@/config/index.js' +import { + type MigrationPhase, + latestByColumn, + readManifest, +} from '@cipherstash/migrate' +import * as p from '@clack/prompts' +import pg from 'pg' + +interface Row { + table: string + column: string + phase: string + eql: string + indexes: string + progress: string + flags: string +} + +/** + * CLI handler for `stash encrypt status`. Renders a table with one row per + * known (table, column), merging three sources: + * + * - The repo manifest (`.cipherstash/migrations.json`) — declared intent. + * - The active / pending `eql_v2_configuration` row — EQL state + indexes. + * - The latest `cs_migrations` event per column — runtime phase + progress. + * + * Plus `information_schema.columns` to surface structural drift (for + * example: intent says a column should be encrypted, but the + * `_encrypted` column doesn't exist yet). + */ +export async function statusCommand() { + p.intro('npx @cipherstash/cli encrypt status') + + const config = await loadStashConfig() + const manifest = await readManifest(process.cwd()) + const client = new pg.Client({ connectionString: config.databaseUrl }) + + try { + await client.connect() + + const [stateMap, eqlConfig, physicalCols] = await Promise.all([ + latestByColumnSafe(client), + fetchActiveEqlConfig(client), + fetchPhysicalColumns(client), + ]) + + const rows: Row[] = [] + const seen = new Set() + + if (manifest) { + for (const [tableName, columns] of Object.entries(manifest.tables)) { + for (const column of columns) { + const key = `${tableName}.${column.column}` + seen.add(key) + rows.push( + renderRow({ + tableName, + columnName: column.column, + intentIndexes: column.indexes, + state: stateMap.get(key) ?? null, + eqlColumn: eqlConfig.get(key) ?? null, + physicalColumns: physicalCols.get(tableName) ?? new Set(), + }), + ) + } + } + } + + for (const [key, state] of stateMap) { + if (seen.has(key)) continue + const [tableName, columnName] = key.split('.') as [string, string] + rows.push( + renderRow({ + tableName, + columnName, + intentIndexes: undefined, + state, + eqlColumn: eqlConfig.get(key) ?? null, + physicalColumns: physicalCols.get(tableName) ?? new Set(), + }), + ) + } + + if (rows.length === 0) { + p.log.info( + 'No encrypted columns yet. Run `stash db push` then `stash encrypt advance`.', + ) + p.outro('Nothing to show.') + return + } + + p.note(formatTable(rows), 'Column migration status') + p.outro('Done.') + } catch (error) { + p.log.error( + error instanceof Error ? error.message : 'Failed to read status.', + ) + process.exit(1) + } finally { + await client.end() + } +} + +async function latestByColumnSafe(client: pg.Client) { + try { + return await latestByColumn(client) + } catch (err) { + if ( + err instanceof Error && + /cs_migrations|schema "cipherstash"/i.test(err.message) + ) { + return new Map() + } + throw err + } +} + +interface EqlColumnInfo { + indexes: string[] + state: 'active' | 'pending' | 'encrypting' +} + +async function fetchActiveEqlConfig( + client: pg.Client, +): Promise> { + const out = new Map() + try { + const result = await client.query<{ state: string; data: unknown }>( + `SELECT state, data FROM eql_v2_configuration + WHERE state IN ('active', 'pending', 'encrypting') + ORDER BY CASE state WHEN 'active' THEN 0 WHEN 'encrypting' THEN 1 ELSE 2 END`, + ) + for (const row of result.rows) { + const data = row.data as { + tables?: Record< + string, + Record }> + > + } | null + if (!data?.tables) continue + for (const [tableName, columns] of Object.entries(data.tables)) { + for (const [columnName, column] of Object.entries(columns)) { + const key = `${tableName}.${columnName}` + if (out.has(key)) continue + out.set(key, { + indexes: Object.keys(column.indexes ?? {}), + state: row.state as 'active' | 'pending' | 'encrypting', + }) + } + } + } + } catch (err) { + if (err instanceof Error && /eql_v2_configuration/i.test(err.message)) { + return out + } + throw err + } + return out +} + +async function fetchPhysicalColumns( + client: pg.Client, +): Promise>> { + const out = new Map>() + const result = await client.query<{ + table_name: string + column_name: string + }>( + `SELECT table_name, column_name FROM information_schema.columns + WHERE table_schema = current_schema()`, + ) + for (const row of result.rows) { + const set = out.get(row.table_name) ?? new Set() + set.add(row.column_name) + out.set(row.table_name, set) + } + return out +} + +function renderRow(input: { + tableName: string + columnName: string + intentIndexes: string[] | undefined + state: { + phase: MigrationPhase + rowsProcessed: number | null + rowsTotal: number | null + } | null + eqlColumn: EqlColumnInfo | null + physicalColumns: Set +}): Row { + const { + tableName, + columnName, + intentIndexes, + state, + eqlColumn, + physicalColumns, + } = input + + const phase = state?.phase ?? (intentIndexes ? 'schema-added' : '—') + const eql = eqlColumn ? eqlColumn.state : '—' + const indexes = eqlColumn + ? eqlColumn.indexes.join(', ') || '(none)' + : intentIndexes?.join(', ') || '—' + + let progress = '—' + if (state && state.rowsTotal !== null && state.rowsTotal !== undefined) { + const pct = + state.rowsTotal > 0 + ? Math.floor(((state.rowsProcessed ?? 0) / state.rowsTotal) * 100) + : 100 + progress = `${state.rowsProcessed ?? 0}/${state.rowsTotal} (${pct}%)` + } + + const flags: string[] = [] + if (intentIndexes && !eqlColumn) flags.push('not-registered') + if (intentIndexes && !physicalColumns.has(`${columnName}_encrypted`)) { + flags.push('encrypted-col-missing') + } + if (phase === 'cut-over' && !physicalColumns.has(`${columnName}_plaintext`)) { + flags.push('plaintext-col-missing') + } + + return { + table: tableName, + column: columnName, + phase, + eql, + indexes, + progress, + flags: flags.join(', '), + } +} + +function formatTable(rows: Row[]): string { + const headers: Row = { + table: 'TABLE', + column: 'COLUMN', + phase: 'PHASE', + eql: 'EQL', + indexes: 'INDEXES', + progress: 'PROGRESS', + flags: 'FLAGS', + } + const all = [headers, ...rows] + const widths: Record = { + table: 0, + column: 0, + phase: 0, + eql: 0, + indexes: 0, + progress: 0, + flags: 0, + } + for (const row of all) { + for (const key of Object.keys(widths) as (keyof Row)[]) { + widths[key] = Math.max(widths[key], row[key].length) + } + } + return all + .map((row) => + (Object.keys(widths) as (keyof Row)[]) + .map((k) => row[k].padEnd(widths[k])) + .join(' '), + ) + .join('\n') +} diff --git a/packages/migrate/README.md b/packages/migrate/README.md new file mode 100644 index 00000000..2c7b38c4 --- /dev/null +++ b/packages/migrate/README.md @@ -0,0 +1,89 @@ +# @cipherstash/migrate + +Primitives for migrating existing plaintext columns to CipherStash's `eql_v2_encrypted` in production Postgres databases, safely and resumably. + +Backs the `stash encrypt` CLI command group, but also exported for direct use — embed `runBackfill()` in your own worker or cron job when you'd rather not pipe gigabytes through a CLI process. + +## Lifecycle + +Each column walks through these phases: + +``` +schema-added → dual-writing → backfilling → backfilled → cut-over → dropped +``` + +State is tracked in an append-only `cipherstash.cs_migrations` table installed by `stash db install`. The EQL intent (which indexes, which cast_as) continues to live in `eql_v2_configuration` so Proxy continues to work against the same database. + +## API + +```ts +import { + installMigrationsSchema, + appendEvent, + latestByColumn, + progress, + runBackfill, + renameEncryptedColumns, + reloadConfig, + readManifest, + writeManifest, +} from '@cipherstash/migrate' +``` + +### `installMigrationsSchema(client)` + +Creates `cipherstash.cs_migrations` idempotently. Normally called by `stash db install`. + +### `runBackfill({ db, encryptionClient, tableSchema, tableName, plaintextColumn, encryptedColumn, pkColumn, schemaColumnKey, chunkSize?, signal?, onProgress? })` + +Chunked, resumable, idempotent backfill of plaintext → encrypted. Per chunk, in a single transaction: select next page → encrypt via `client.bulkEncryptModels` → `UPDATE … FROM (VALUES …)` → `INSERT` a `backfill_checkpoint` event. Guards with `encrypted IS NULL` so re-runs never double-write. + +- `db`: a `pg.PoolClient` (the runner drives transactions on it). +- `encryptionClient`: your initialised `@cipherstash/stack` `EncryptionClient` (or anything that exposes `bulkEncryptModels(models, table)` returning `{ data } | { failure }`). +- `tableSchema`: the `EncryptedTable` for the target table from your encryption client file. +- `signal`: optional `AbortSignal`. If aborted between chunks, the backfill exits cleanly and leaves a resumable checkpoint. + +Returns `{ resumed, rowsProcessed, rowsTotal, completed }`. + +### `appendEvent(client, { tableName, columnName, event, phase, … })` / `progress(client, table, column)` / `latestByColumn(client)` + +Direct access to the `cs_migrations` event log. Use these if you're building your own migration UI or orchestration on top. + +### `renameEncryptedColumns(client)` / `reloadConfig(client)` + +Thin wrappers around `eql_v2.rename_encrypted_columns()` (the cut-over primitive) and `eql_v2.reload_config()` (Proxy refresh hint — no-op when connected directly to Postgres). + +### `readManifest(cwd)` / `writeManifest(manifest, cwd)` + +Read/write `.cipherstash/migrations.json` — the repo-side intent declaration. Zod-validated. The manifest is optional; commands work without it but you lose the `plan` diff. + +## Drop-in usage in a BullMQ/Inngest worker + +```ts +import pg from 'pg' +import { runBackfill } from '@cipherstash/migrate' +import { encryptionClient, usersTable } from './src/encryption/index.js' + +const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL }) + +export async function handler({ signal }: { signal: AbortSignal }) { + const db = await pool.connect() + try { + return await runBackfill({ + db, + encryptionClient, + tableSchema: usersTable, + tableName: 'users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 2000, + signal, + onProgress: (p) => console.log(`${p.rowsProcessed}/${p.rowsTotal}`), + }) + } finally { + db.release() + } +} +``` diff --git a/packages/migrate/package.json b/packages/migrate/package.json new file mode 100644 index 00000000..1b339a84 --- /dev/null +++ b/packages/migrate/package.json @@ -0,0 +1,65 @@ +{ + "name": "@cipherstash/migrate", + "version": "0.1.0", + "description": "Plaintext-to-encrypted column migration for CipherStash: resumable backfill, per-column state, and EQL lifecycle orchestration.", + "keywords": [ + "cipherstash", + "encryption", + "migration", + "postgres", + "eql" + ], + "license": "MIT", + "author": "CipherStash ", + "files": [ + "dist", + "README.md", + "LICENSE", + "CHANGELOG.md" + ], + "type": "module", + "main": "./dist/index.cjs", + "module": "./dist/index.js", + "sideEffects": false, + "types": "./dist/index.d.ts", + "exports": { + ".": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "./package.json": "./package.json" + }, + "scripts": { + "build": "tsup", + "dev": "tsup --watch", + "test": "vitest run", + "lint": "biome check ." + }, + "dependencies": { + "zod": "^3.24.2" + }, + "peerDependencies": { + "@cipherstash/stack": ">=0.6.0", + "pg": ">=8" + }, + "devDependencies": { + "@cipherstash/stack": "workspace:*", + "@types/pg": "^8.11.11", + "pg": "8.13.1", + "tsup": "catalog:repo", + "typescript": "catalog:repo", + "vitest": "catalog:repo" + }, + "publishConfig": { + "access": "public" + }, + "engines": { + "node": ">=18" + } +} diff --git a/packages/migrate/src/__tests__/backfill.integration.test.ts b/packages/migrate/src/__tests__/backfill.integration.test.ts new file mode 100644 index 00000000..21a1d805 --- /dev/null +++ b/packages/migrate/src/__tests__/backfill.integration.test.ts @@ -0,0 +1,430 @@ +/** + * Integration tests for the backfill engine against a real local Postgres. + * + * Skipped unless `PG_TEST_URL` is set. Suggested setup: + * + * ``` + * cd local && docker compose up -d + * PG_TEST_URL=postgres://cipherstash:password@localhost:5432/cipherstash \ + * pnpm -F @cipherstash/migrate test backfill.integration + * ``` + * + * These tests do NOT require CipherStash credentials — they use a stub + * encryption client that returns deterministic marker payloads. They + * exercise the full mechanics of the backfill loop (chunking, keyset + * pagination, checkpointing, resume, idempotency, error handling) + * against a real transactional Postgres, which is the part with the + * most surface area for subtle bugs. + */ + +import pg from 'pg' +import { afterAll, afterEach, beforeAll, describe, expect, it } from 'vitest' +import { type EncryptionClientLike, runBackfill } from '../backfill.js' +import { installMigrationsSchema } from '../install.js' +import { latestByColumn, progress } from '../state.js' + +const PG_URL = process.env.PG_TEST_URL + +// Skip the whole file when PG_TEST_URL is not configured. We wrap describe +// so that vitest still renders a "skipped" entry rather than silently +// omitting the file. +const runIntegration = Boolean(PG_URL) + +describe.skipIf(!runIntegration)('runBackfill (integration)', () => { + let pool: pg.Pool + + beforeAll(async () => { + pool = new pg.Pool({ connectionString: PG_URL, max: 4 }) + + // Fresh slate: own schema so we can blow it away without touching EQL. + const db = await pool.connect() + try { + await db.query('DROP SCHEMA IF EXISTS cipherstash CASCADE') + await db.query('DROP SCHEMA IF EXISTS migrate_test CASCADE') + await db.query('CREATE SCHEMA migrate_test') + await installMigrationsSchema(db) + } finally { + db.release() + } + }) + + afterAll(async () => { + await pool.end() + }) + + afterEach(async () => { + // Each test uses a fresh users table, but the migrations log is + // shared so later queries see a clean slate too. + const db = await pool.connect() + try { + await db.query('DROP TABLE IF EXISTS migrate_test.users') + await db.query('TRUNCATE cipherstash.cs_migrations') + } finally { + db.release() + } + }) + + /** Stub that returns `{"ct": ""}` for every input — lets us + * verify the runner's UPDATE path without running real encryption. */ + const stubClient: EncryptionClientLike = { + bulkEncryptModels(input) { + return Promise.resolve({ + data: input.map((row) => ({ + __pk: row.__pk, + email: { + v: 2, + i: { t: 'users', c: 'email' }, + c: `mock-ciphertext:${row.email}`, + }, + })), + }) + }, + } + + async function seed(n: number) { + const db = await pool.connect() + try { + await db.query(` + CREATE TABLE migrate_test.users ( + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + email text NOT NULL, + email_encrypted jsonb + ) + `) + await db.query( + `INSERT INTO migrate_test.users (email) + SELECT 'user-' || g || '@example.com' FROM generate_series(1, $1) AS g`, + [n], + ) + } finally { + db.release() + } + } + + async function countEncrypted(): Promise { + const result = await pool.query<{ n: string }>( + 'SELECT count(*)::text AS n FROM migrate_test.users WHERE email_encrypted IS NOT NULL', + ) + return Number(result.rows[0]?.n ?? 0) + } + + it('backfills every row and records a completion event', async () => { + await seed(500) + const db = await pool.connect() + try { + const result = await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 100, + }) + expect(result.completed).toBe(true) + expect(result.rowsProcessed).toBe(500) + expect(result.resumed).toBe(false) + } finally { + db.release() + } + + expect(await countEncrypted()).toBe(500) + + const readDb = await pool.connect() + try { + const state = await progress(readDb, 'migrate_test.users', 'email') + expect(state?.phase).toBe('backfilled') + expect(state?.event).toBe('backfilled') + expect(state?.rowsProcessed).toBe(500) + } finally { + readDb.release() + } + }) + + it('is idempotent on re-run (zero additional writes)', async () => { + await seed(200) + const run = async () => { + const db = await pool.connect() + try { + return await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 50, + }) + } finally { + db.release() + } + } + + await run() + expect(await countEncrypted()).toBe(200) + + // Capture a per-row hash so we can prove nothing was rewritten. + const before = await pool.query<{ h: string }>( + `SELECT md5(string_agg(email_encrypted::text, ',' ORDER BY id)) AS h FROM migrate_test.users`, + ) + + const secondResult = await run() + expect(secondResult.completed).toBe(true) + expect(secondResult.rowsProcessed).toBe(0) // starts from zero because no checkpoint is a checkpoint event + + const after = await pool.query<{ h: string }>( + `SELECT md5(string_agg(email_encrypted::text, ',' ORDER BY id)) AS h FROM migrate_test.users`, + ) + expect(after.rows[0]?.h).toBe(before.rows[0]?.h) + }) + + it('resumes from a checkpoint after mid-run abort', async () => { + await seed(500) + + // First pass: abort after ~200 rows. + const controller = new AbortController() + let rowsSeen = 0 + const db1 = await pool.connect() + let firstResult: Awaited> + try { + firstResult = await runBackfill({ + db: db1, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 50, + signal: controller.signal, + onProgress: (p) => { + rowsSeen = p.rowsProcessed + if (rowsSeen >= 200) controller.abort() + }, + }) + } finally { + db1.release() + } + + expect(firstResult.completed).toBe(false) + expect(firstResult.rowsProcessed).toBeGreaterThanOrEqual(200) + expect(firstResult.rowsProcessed).toBeLessThan(500) + const partialCount = await countEncrypted() + expect(partialCount).toBe(firstResult.rowsProcessed) + + // Second pass: resume from checkpoint, should finish. + const db2 = await pool.connect() + let secondResult: Awaited> + try { + secondResult = await runBackfill({ + db: db2, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 50, + }) + } finally { + db2.release() + } + + expect(secondResult.resumed).toBe(true) + expect(secondResult.completed).toBe(true) + expect(await countEncrypted()).toBe(500) + }) + + it('records an error event and rethrows when encryption fails', async () => { + await seed(100) + let calls = 0 + const failingClient: EncryptionClientLike = { + bulkEncryptModels(input) { + calls += 1 + if (calls === 2) { + return Promise.resolve({ + failure: { message: 'ZeroKMS exploded', type: 'EncryptionError' }, + }) + } + return Promise.resolve({ + data: input.map((row) => ({ + __pk: row.__pk, + email: { + v: 2, + i: { t: 'users', c: 'email' }, + c: `mock-ciphertext:${row.email}`, + }, + })), + }) + }, + } + + const db = await pool.connect() + try { + await expect( + runBackfill({ + db, + encryptionClient: failingClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 25, + }), + ).rejects.toThrow(/ZeroKMS exploded/) + } finally { + db.release() + } + + const readDb = await pool.connect() + try { + const state = await progress(readDb, 'migrate_test.users', 'email') + expect(state?.event).toBe('error') + expect((state?.details as { message?: string } | null)?.message).toMatch( + /ZeroKMS exploded/, + ) + } finally { + readDb.release() + } + // Chunk 1 succeeded and committed before chunk 2 failed. + expect(await countEncrypted()).toBe(25) + }) + + it('handles an empty table gracefully', async () => { + await seed(0) + const db = await pool.connect() + try { + const result = await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 100, + }) + expect(result.completed).toBe(true) + expect(result.rowsProcessed).toBe(0) + } finally { + db.release() + } + }) + + it('skips rows already encrypted by a previous run (idempotency guard)', async () => { + await seed(100) + // Pre-encrypt half the rows by hand — simulates a dual-write path + // that was already populating encrypted ciphertext before backfill ran. + await pool.query( + `UPDATE migrate_test.users + SET email_encrypted = jsonb_build_object('v', 2, 'i', jsonb_build_object('t', 'users', 'c', 'email'), 'c', 'preexisting') + WHERE id <= 50`, + ) + + const db = await pool.connect() + try { + const result = await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 25, + }) + expect(result.completed).toBe(true) + // Only the 50 unencrypted rows should have been processed. + expect(result.rowsProcessed).toBe(50) + } finally { + db.release() + } + + // First half still has the pre-existing ciphertext. + const preserved = await pool.query<{ n: string }>( + `SELECT count(*)::text AS n FROM migrate_test.users + WHERE id <= 50 AND email_encrypted->>'c' = 'preexisting'`, + ) + expect(Number(preserved.rows[0]?.n)).toBe(50) + + // Second half was backfilled by the stub. + const backfilled = await pool.query<{ n: string }>( + `SELECT count(*)::text AS n FROM migrate_test.users + WHERE id > 50 AND email_encrypted->>'c' LIKE 'mock-ciphertext:user-%'`, + ) + expect(Number(backfilled.rows[0]?.n)).toBe(50) + }) + + it('writes a backfill_started event on every run with resume metadata', async () => { + await seed(100) + const db = await pool.connect() + try { + await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 50, + }) + } finally { + db.release() + } + + const events = await pool.query<{ event: string; details: unknown }>( + `SELECT event, details FROM cipherstash.cs_migrations + WHERE table_name = 'migrate_test.users' AND column_name = 'email' + ORDER BY id ASC`, + ) + const eventNames = events.rows.map((r) => r.event) + expect(eventNames[0]).toBe('backfill_started') + expect(eventNames).toContain('backfill_checkpoint') + expect(eventNames.at(-1)).toBe('backfilled') + expect((events.rows[0]?.details as { resumed?: boolean })?.resumed).toBe( + false, + ) + }) + + it('latestByColumn returns the most recent row per column', async () => { + await seed(50) + const db = await pool.connect() + try { + await runBackfill({ + db, + encryptionClient: stubClient, + tableSchema: { tableName: 'users', build: () => ({}) }, + tableName: 'migrate_test.users', + schemaColumnKey: 'email', + plaintextColumn: 'email', + encryptedColumn: 'email_encrypted', + pkColumn: 'id', + chunkSize: 25, + }) + } finally { + db.release() + } + + const readDb = await pool.connect() + try { + const map = await latestByColumn(readDb) + const latest = map.get('migrate_test.users.email') + expect(latest?.phase).toBe('backfilled') + expect(latest?.rowsProcessed).toBe(50) + } finally { + readDb.release() + } + }) +}) diff --git a/packages/migrate/src/__tests__/manifest.test.ts b/packages/migrate/src/__tests__/manifest.test.ts new file mode 100644 index 00000000..a8e55706 --- /dev/null +++ b/packages/migrate/src/__tests__/manifest.test.ts @@ -0,0 +1,104 @@ +import { mkdtempSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { describe, expect, it } from 'vitest' +import { manifestPath, readManifest, writeManifest } from '../manifest.js' + +describe('manifest', () => { + it('returns null when manifest is absent', async () => { + const tmp = mkdtempSync(join(tmpdir(), 'cs-manifest-')) + try { + const result = await readManifest(tmp) + expect(result).toBeNull() + } finally { + rmSync(tmp, { recursive: true, force: true }) + } + }) + + it('round-trips a manifest through write and read', async () => { + const tmp = mkdtempSync(join(tmpdir(), 'cs-manifest-')) + try { + await writeManifest( + { + version: 1, + tables: { + users: [ + { + column: 'email', + castAs: 'text', + indexes: ['unique', 'match'], + targetPhase: 'cut-over', + pkColumn: 'id', + }, + ], + }, + }, + tmp, + ) + const read = await readManifest(tmp) + expect(read?.tables.users?.[0]?.column).toBe('email') + expect(read?.tables.users?.[0]?.indexes).toEqual(['unique', 'match']) + } finally { + rmSync(tmp, { recursive: true, force: true }) + } + }) + + it('applies defaults for optional fields', async () => { + const tmp = mkdtempSync(join(tmpdir(), 'cs-manifest-')) + try { + await writeManifest( + { + version: 1, + tables: { + users: [ + { + column: 'email', + castAs: 'text', + indexes: [], + targetPhase: 'cut-over', + }, + ], + }, + }, + tmp, + ) + const read = await readManifest(tmp) + expect(read?.tables.users?.[0]?.targetPhase).toBe('cut-over') + expect(read?.tables.users?.[0]?.indexes).toEqual([]) + } finally { + rmSync(tmp, { recursive: true, force: true }) + } + }) + + it('rejects invalid index kinds', async () => { + const tmp = mkdtempSync(join(tmpdir(), 'cs-manifest-')) + try { + await expect( + writeManifest( + { + version: 1, + tables: { + users: [ + { + column: 'email', + castAs: 'text', + // biome-ignore lint/suspicious/noExplicitAny: intentional bad input + indexes: ['bogus' as any], + targetPhase: 'cut-over', + }, + ], + }, + }, + tmp, + ), + ).rejects.toThrow() + } finally { + rmSync(tmp, { recursive: true, force: true }) + } + }) + + it('exposes the canonical path', () => { + const result = manifestPath('/tmp/project') + expect(result).toBe('/tmp/project/.cipherstash/migrations.json') + }) +}) diff --git a/packages/migrate/src/__tests__/sql.test.ts b/packages/migrate/src/__tests__/sql.test.ts new file mode 100644 index 00000000..2c351461 --- /dev/null +++ b/packages/migrate/src/__tests__/sql.test.ts @@ -0,0 +1,23 @@ +import { describe, expect, it } from 'vitest' +import { qualifyTable } from '../cursor.js' +import { quoteIdent } from '../sql.js' + +describe('quoteIdent', () => { + it('wraps simple identifiers in double quotes', () => { + expect(quoteIdent('email')).toBe('"email"') + }) + + it('escapes embedded double quotes', () => { + expect(quoteIdent('foo"bar')).toBe('"foo""bar"') + }) +}) + +describe('qualifyTable', () => { + it('quotes a single-part table name', () => { + expect(qualifyTable('users')).toBe('"users"') + }) + + it('quotes each part of a schema-qualified name', () => { + expect(qualifyTable('public.users')).toBe('"public"."users"') + }) +}) diff --git a/packages/migrate/src/__tests__/state.test.ts b/packages/migrate/src/__tests__/state.test.ts new file mode 100644 index 00000000..a53498e5 --- /dev/null +++ b/packages/migrate/src/__tests__/state.test.ts @@ -0,0 +1,193 @@ +import type { + ClientBase, + QueryArrayConfig, + QueryArrayResult, + QueryConfig, + QueryResult, + QueryResultRow, + Submittable, +} from 'pg' +import { describe, expect, it } from 'vitest' +import { appendEvent, latestByColumn, progress } from '../state.js' + +interface RecordedQuery { + text: string + values: unknown[] +} + +function createMockClient( + responses: Array<{ rows: Array> }>, +): { client: ClientBase; queries: RecordedQuery[] } { + const queries: RecordedQuery[] = [] + let i = 0 + const client = { + query( + config: string | QueryConfig | QueryArrayConfig | Submittable, + values?: unknown[], + ) { + const text = + typeof config === 'string' ? config : (config as QueryConfig).text + queries.push({ text, values: values ?? [] }) + const resp = responses[i++] ?? { rows: [] } + return Promise.resolve({ + rows: resp.rows, + rowCount: resp.rows.length, + command: '', + oid: 0, + fields: [], + } as unknown as QueryResult | QueryArrayResult) + }, + } as unknown as ClientBase + return { client, queries } +} + +describe('appendEvent', () => { + it('inserts into cipherstash.cs_migrations with all fields', async () => { + const { client, queries } = createMockClient([ + { + rows: [ + { + id: 42, + table_name: 'users', + column_name: 'email', + event: 'backfill_checkpoint', + phase: 'backfilling', + cursor_value: '1234', + rows_processed: 500, + rows_total: 1000, + details: { chunkIndex: 2 }, + created_at: new Date('2026-04-23T00:00:00Z'), + }, + ], + }, + ]) + + const row = await appendEvent(client, { + tableName: 'users', + columnName: 'email', + event: 'backfill_checkpoint', + phase: 'backfilling', + cursorValue: '1234', + rowsProcessed: 500, + rowsTotal: 1000, + details: { chunkIndex: 2 }, + }) + + expect(queries).toHaveLength(1) + expect(queries[0]?.text).toMatch(/INSERT INTO cipherstash\.cs_migrations/) + expect(queries[0]?.values).toEqual([ + 'users', + 'email', + 'backfill_checkpoint', + 'backfilling', + '1234', + 500, + 1000, + { chunkIndex: 2 }, + ]) + expect(row.id).toBe('42') + expect(row.rowsProcessed).toBe(500) + }) + + it('nulls optional fields when omitted', async () => { + const { client, queries } = createMockClient([ + { + rows: [ + { + id: 1, + table_name: 'users', + column_name: 'email', + event: 'schema_added', + phase: 'schema-added', + cursor_value: null, + rows_processed: null, + rows_total: null, + details: null, + created_at: new Date(), + }, + ], + }, + ]) + + await appendEvent(client, { + tableName: 'users', + columnName: 'email', + event: 'schema_added', + phase: 'schema-added', + }) + + expect(queries[0]?.values.slice(4)).toEqual([null, null, null, null]) + }) +}) + +describe('latestByColumn', () => { + it('returns a map keyed by `table.column`', async () => { + const { client } = createMockClient([ + { + rows: [ + { + id: 10, + table_name: 'users', + column_name: 'email', + event: 'backfilled', + phase: 'backfilled', + cursor_value: null, + rows_processed: 100, + rows_total: 100, + details: null, + created_at: new Date(), + }, + { + id: 9, + table_name: 'orders', + column_name: 'notes', + event: 'dual_writing', + phase: 'dual-writing', + cursor_value: null, + rows_processed: null, + rows_total: null, + details: null, + created_at: new Date(), + }, + ], + }, + ]) + + const map = await latestByColumn(client) + expect(map.size).toBe(2) + expect(map.get('users.email')?.phase).toBe('backfilled') + expect(map.get('orders.notes')?.phase).toBe('dual-writing') + }) +}) + +describe('progress', () => { + it('returns null when no rows exist', async () => { + const { client } = createMockClient([{ rows: [] }]) + const result = await progress(client, 'users', 'email') + expect(result).toBeNull() + }) + + it('returns the latest row', async () => { + const { client } = createMockClient([ + { + rows: [ + { + id: 5, + table_name: 'users', + column_name: 'email', + event: 'backfill_checkpoint', + phase: 'backfilling', + cursor_value: '999', + rows_processed: 3000, + rows_total: 10000, + details: null, + created_at: new Date(), + }, + ], + }, + ]) + const result = await progress(client, 'users', 'email') + expect(result?.cursorValue).toBe('999') + expect(result?.rowsProcessed).toBe(3000) + }) +}) diff --git a/packages/migrate/src/backfill.ts b/packages/migrate/src/backfill.ts new file mode 100644 index 00000000..48bdb01b --- /dev/null +++ b/packages/migrate/src/backfill.ts @@ -0,0 +1,479 @@ +import { isEncryptedPayload } from '@cipherstash/stack' +import type { ClientBase, PoolClient } from 'pg' +import { + countUnencrypted, + fetchUnencryptedPage, + qualifyTable, +} from './cursor.js' +import { quoteIdent } from './sql.js' +import { type MigrationPhase, appendEvent, progress } from './state.js' + +// Loose structural types — keep this library decoupled from @cipherstash/stack +// so @cipherstash/migrate can be built and tested without pulling the full +// stack graph in. `EncryptionClientLike` matches the shape `EncryptionClient` +// from `@cipherstash/stack/encryption` exposes via `bulkEncryptModels`. + +/** + * Shape returned by {@link EncryptionClientLike.bulkEncryptModels} on success. + * `data` is the array of models with the configured fields replaced by + * ciphertext payloads, preserving `__pk` and any other non-encrypted fields. + */ +export interface BulkEncryptResultSuccess { + failure?: undefined + data: T[] +} + +/** + * Shape returned by {@link EncryptionClientLike.bulkEncryptModels} on failure. + * Matches `@byteslice/result`'s `{ failure: { message } }` convention used + * by `@cipherstash/stack`. The backfill halts and writes an `error` event + * to `cs_migrations` when this is returned. + */ +export interface BulkEncryptResultFailure { + failure: { message: string; type?: string } + data?: undefined +} + +/** + * Discriminated union returned by bulk-encrypt. Narrow with `if (r.failure)` + * vs `if (r.data)`. No exceptions are thrown by the underlying operation. + */ +export type BulkEncryptResult = + | BulkEncryptResultSuccess + | BulkEncryptResultFailure + +/** + * A thenable wrapper around {@link BulkEncryptResult}. `@cipherstash/stack`'s + * `bulkEncryptModels` returns a fluent operation builder that resolves to a + * `Result` when awaited; this alias accepts anything `PromiseLike` so we + * don't bind to the full operation class in type-land. + */ +export type BulkEncryptThenable = PromiseLike> + +/** + * Minimal surface of `@cipherstash/stack`'s `EncryptionClient` used by + * {@link runBackfill}. Only `bulkEncryptModels` is required — the backfill + * encrypts in batches, it does not need the single-value `encrypt` API. + * + * Supplying an object that duck-types to this interface is enough; you do + * not have to import `EncryptionClient` itself. + * + * @example + * ```ts + * // Typical wiring: the user's `src/encryption/index.ts` exports an + * // already-initialised client, and you pass it through. + * import { encryptionClient } from './src/encryption/index.js' + * await runBackfill({ encryptionClient, ... }) + * ``` + */ +export interface EncryptionClientLike { + /** + * Bulk-encrypt a batch of plaintext models against a table schema. + * + * @param input - Array of models. Each row is `{ [schemaColumnKey]: plaintext, ... }`. + * The backfill also includes a `__pk` field per row so it can correlate + * the encrypted result back to the database row on UPDATE. + * @param table - The `EncryptedTable` schema for the target table. Typed + * as `any` here to keep this library decoupled from `@cipherstash/stack`. + */ + bulkEncryptModels( + input: Array>, + // biome-ignore lint/suspicious/noExplicitAny: Stack's EncryptedTable is generic + table: any, + ): BulkEncryptThenable> +} + +/** + * Snapshot of backfill progress, passed to {@link BackfillOptions.onProgress} + * after every successful chunk commit. Values represent the cumulative state + * *after* the most recent chunk — including any rows processed by a prior + * run that this invocation resumed from. + */ +export interface BackfillProgress { + /** Total rows written to the encrypted column so far (includes a resumed prior run). */ + rowsProcessed: number + /** Total rows we expect to process over the life of this migration (incl. resumed). */ + rowsTotal: number + /** PK of the last row processed in the most recent chunk, cast to text. */ + lastPk: string | null + /** Chunk size used for this run (echoed from {@link BackfillOptions.chunkSize}). */ + chunkSize: number + /** Zero-based index of the chunk that just completed. */ + chunkIndex: number +} + +/** + * Options for {@link runBackfill}. + * + * Distinguishes three separate name spaces that a reader has to keep straight: + * - **Physical names** ({@link tableName}, {@link plaintextColumn}, {@link encryptedColumn}, {@link pkColumn}) + * are Postgres identifiers used verbatim in SQL. + * - **Schema name** ({@link schemaColumnKey}) is the key on the `EncryptedTable` + * schema object that corresponds to the column. In the common drizzle + * convention — where the schema declares the encrypted column (not the + * plaintext one) — this equals `encryptedColumn`. Pass explicitly only + * when your schema's object keys diverge from the physical column names. + */ +export interface BackfillOptions { + /** + * A pg pool client the runner owns for the duration of the call. The + * runner issues `BEGIN`/`COMMIT` on this connection for each chunk, so it + * must not be shared across concurrent work during the backfill. + * + * Acquire with `const db = await pool.connect()`, release with + * `db.release()` in your `finally`. + */ + db: PoolClient + /** + * Initialised encryption client. See {@link EncryptionClientLike} for the + * required surface (just `bulkEncryptModels`). + */ + encryptionClient: EncryptionClientLike + /** + * The `EncryptedTable` schema object for the target table, as exported + * from the user's `src/encryption/index.ts`. Passed through to + * `encryptionClient.bulkEncryptModels(models, tableSchema)`. Typed as + * `any` to avoid coupling this library to `@cipherstash/stack`. + */ + // biome-ignore lint/suspicious/noExplicitAny: Stack's EncryptedTable is generic + tableSchema: any + /** + * Physical Postgres table name. Supports `"schema.table"` for non-default + * schemas (identifiers are quoted automatically). + */ + tableName: string + /** + * The key in {@link tableSchema} that corresponds to this column. With + * the drizzle `extractEncryptionSchema` convention, where the schema is + * derived from a table like `{ email_encrypted: encryptedType(...) }`, + * this equals {@link encryptedColumn}. With a handwritten + * `encryptedTable('users', { email: … })` schema where there's only one + * column, this usually equals {@link plaintextColumn}. + */ + schemaColumnKey: string + /** + * Physical column that holds the plaintext being encrypted, e.g. `email`. + * The runner reads rows where this is `NOT NULL` and the target encrypted + * column is `NULL`. + */ + plaintextColumn: string + /** + * Physical column that receives the `eql_v2_encrypted` ciphertext JSON, + * e.g. `email_encrypted`. Must already exist (typically created by + * `drizzle-kit` / a prior migration) before backfill starts. + */ + encryptedColumn: string + /** + * Physical single-column primary key used for keyset pagination — the + * runner issues `WHERE pk > $after ORDER BY pk ASC LIMIT $n`. Must be + * comparable with `>` (bigint, integer, text, uuid all work). Composite + * primary keys are not yet supported. + */ + pkColumn: string + /** + * Rows per chunk / transaction. Default 1000. Tune down if you see lock + * contention, up for tables with small row payloads. A single chunk is + * one `BEGIN`/`UPDATE`/`INSERT checkpoint`/`COMMIT` cycle, so the value + * also bounds how much work is lost when you `Ctrl-C` mid-chunk. + */ + chunkSize?: number + /** + * Optional abort signal. Checked *between* chunks — the in-flight chunk + * always completes and checkpoints before the loop exits. Safe to wire + * to `SIGINT` / `SIGTERM`; the CLI does exactly this. + */ + signal?: AbortSignal + /** + * Invoked synchronously after each chunk has committed. Safe for logging + * and UI updates; throwing from this callback will kill the backfill. + */ + onProgress?: (progress: BackfillProgress) => void + /** + * Optional coercion applied to each row's plaintext value before it is + * passed to {@link EncryptionClientLike.bulkEncryptModels}. Needed when + * the pg driver's native JS type doesn't match the schema's declared + * dataType — e.g. pg returns `numeric` as a string, but a schema + * declaring `dataType('number')` expects a JS number. The CLI + * builds an appropriate coercer from the schema's `cast_as`; library + * callers can supply their own or leave undefined (identity). + */ + transformPlaintext?: (value: unknown) => unknown +} + +/** + * Return value from {@link runBackfill}. + */ +export interface BackfillResult { + /** + * `true` if the run began from a previously-recorded checkpoint rather + * than starting fresh. Determined by the most recent event for this + * column being a `backfill_checkpoint`. + */ + resumed: boolean + /** + * Cumulative rows written to the encrypted column, including any from a + * prior run this invocation resumed from. + */ + rowsProcessed: number + /** + * Total rows the migration expects to process end-to-end (including + * resumed). Computed as `priorProcessed + currentRunTotal` at start. + */ + rowsTotal: number + /** + * `true` if this run drained all remaining rows. `false` means the run + * was aborted (via {@link BackfillOptions.signal}) or is otherwise + * paused and should be resumed by re-invoking with the same options. + */ + completed: boolean +} + +/** + * Run a chunked, resumable, idempotent backfill of plaintext → encrypted. + * + * Per chunk, in a single transaction: + * 1. `SELECT` the next page of rows where the encrypted column is `NULL` + * and the PK is greater than the cursor. + * 2. Encrypt the batch via {@link EncryptionClientLike.bulkEncryptModels}. + * 3. `UPDATE … FROM (VALUES …)` to write the ciphertext back. + * 4. `INSERT` a `backfill_checkpoint` event into `cipherstash.cs_migrations`. + * 5. `COMMIT`. + * + * **Idempotency** — the `encrypted IS NULL` guard in both the SELECT and the + * UPDATE's `WHERE` clause means re-runs never double-write a row, even if + * the cursor is lost. + * + * **Resumability** — restarting with the same arguments will pick up from + * the last committed checkpoint. Use {@link BackfillOptions.signal} to + * abort cleanly on `SIGINT`. + * + * **Failure handling** — if any chunk fails (encrypt error or DB error), + * the transaction is rolled back, an `error` event is appended to + * `cs_migrations` for diagnostics, and the error is re-thrown. + * + * @example + * ```ts + * const db = await pool.connect() + * try { + * const result = await runBackfill({ + * db, + * encryptionClient, + * tableSchema: usersTable, + * tableName: 'users', + * schemaColumnKey: 'email', + * plaintextColumn: 'email', + * encryptedColumn: 'email_encrypted', + * pkColumn: 'id', + * chunkSize: 1000, + * onProgress: (p) => console.log(`${p.rowsProcessed}/${p.rowsTotal}`), + * }) + * console.log(result.completed ? 'done' : 'paused — re-run to resume') + * } finally { + * db.release() + * } + * ``` + */ +export async function runBackfill( + options: BackfillOptions, +): Promise { + const chunkSize = options.chunkSize ?? 1000 + const { db, tableName, pkColumn, plaintextColumn, encryptedColumn } = options + + const rowsTotal = await countUnencrypted( + db, + tableName, + plaintextColumn, + encryptedColumn, + ) + + const last = await progress(db, tableName, plaintextColumn) + const resumeCursor = + last?.event === 'backfill_checkpoint' ? last.cursorValue : null + const resumed = resumeCursor !== null + const priorProcessed = + last?.event === 'backfill_checkpoint' ? (last.rowsProcessed ?? 0) : 0 + + await appendEvent(db, { + tableName, + columnName: plaintextColumn, + event: 'backfill_started', + phase: 'backfilling', + cursorValue: resumeCursor, + rowsProcessed: priorProcessed, + rowsTotal: priorProcessed + rowsTotal, + details: { chunkSize, resumed }, + }) + + let cursor = resumeCursor + let rowsProcessed = priorProcessed + const rowsTotalWithResumed = priorProcessed + rowsTotal + let chunkIndex = 0 + let completed = false + + try { + while (true) { + if (options.signal?.aborted) break + + const page = await fetchUnencryptedPage(db, { + tableName, + pkColumn, + plaintextColumn, + encryptedColumn, + after: cursor, + limit: chunkSize, + }) + + if (page.rows.length === 0) { + completed = true + break + } + + const coerce = options.transformPlaintext ?? ((v: unknown) => v) + const models = page.rows.map((row) => ({ + __pk: row.pk, + [options.schemaColumnKey]: coerce(row.plaintext), + })) + + const encryptResult = await options.encryptionClient.bulkEncryptModels( + models, + options.tableSchema, + ) + + if (encryptResult.failure) { + throw new Error( + `bulkEncryptModels failed: ${encryptResult.failure.message}`, + ) + } + + // Leak guard: every row's schemaColumnKey field must be a valid EQL + // payload ({ v, i, c|sv, … }). If any row comes back as a primitive + // or a mis-shaped object, the encryption client silently passed the + // plaintext through — typically because the schema is keyed by a + // different name than `schemaColumnKey`. Fail loudly before any + // write commits; this is what prevents `(82.60)`-shaped composite + // values from ending up in the encrypted column. + for (const [i, row] of encryptResult.data.entries()) { + const value = row[options.schemaColumnKey] + if (!isEncryptedPayload(value)) { + const pk = row.__pk ?? page.rows[i]?.pk + const preview = JSON.stringify(value)?.slice(0, 120) ?? String(value) + throw new Error( + `Encryption client returned a non-ciphertext value at model key "${options.schemaColumnKey}" for pk=${pk} (got: ${preview}). This usually means the schema column key does not match your EncryptedTable. Verify that your schema declares a column keyed "${options.schemaColumnKey}", or pass --schema-column-key to override.`, + ) + } + } + + await db.query('BEGIN') + try { + await writeEncryptedChunk(db, { + tableName, + pkColumn, + encryptedColumn, + schemaColumnKey: options.schemaColumnKey, + encryptedRows: encryptResult.data, + }) + rowsProcessed += page.rows.length + cursor = page.lastPk + await appendEvent(db, { + tableName, + columnName: plaintextColumn, + event: 'backfill_checkpoint', + phase: 'backfilling', + cursorValue: cursor, + rowsProcessed, + rowsTotal: rowsTotalWithResumed, + details: { chunkIndex, chunkRows: page.rows.length }, + }) + await db.query('COMMIT') + } catch (err) { + await db.query('ROLLBACK').catch(() => {}) + throw err + } + + options.onProgress?.({ + rowsProcessed, + rowsTotal: rowsTotalWithResumed, + lastPk: cursor, + chunkSize, + chunkIndex, + }) + chunkIndex += 1 + } + + if (completed) { + await appendEvent(db, { + tableName, + columnName: plaintextColumn, + event: 'backfilled', + phase: 'backfilled', + cursorValue: cursor, + rowsProcessed, + rowsTotal: rowsTotalWithResumed, + details: { chunkCount: chunkIndex }, + }) + } + } catch (err) { + await appendEvent(db, { + tableName, + columnName: plaintextColumn, + event: 'error', + phase: 'backfilling', + cursorValue: cursor, + rowsProcessed, + rowsTotal: rowsTotalWithResumed, + details: { + message: err instanceof Error ? err.message : String(err), + chunkIndex, + }, + }) + throw err + } + + return { + resumed, + rowsProcessed, + rowsTotal: rowsTotalWithResumed, + completed, + } +} + +interface WriteChunkOptions { + tableName: string + pkColumn: string + encryptedColumn: string + schemaColumnKey: string + encryptedRows: Array> +} + +async function writeEncryptedChunk( + db: ClientBase, + opts: WriteChunkOptions, +): Promise { + if (opts.encryptedRows.length === 0) return + + const table = qualifyTable(opts.tableName) + const pk = quoteIdent(opts.pkColumn) + const enc = quoteIdent(opts.encryptedColumn) + + const params: unknown[] = [] + const valuesSql = opts.encryptedRows + .map((row) => { + const pkValue = row.__pk + const encryptedValue = row[opts.schemaColumnKey] + params.push(pkValue) + const pkParam = `$${params.length}` + params.push(encryptedValue) + const encParam = `$${params.length}::jsonb` + return `(${pkParam}, ${encParam})` + }) + .join(', ') + + const sql = ` + UPDATE ${table} AS t + SET ${enc} = v.enc + FROM (VALUES ${valuesSql}) AS v(pk, enc) + WHERE t.${pk}::text = v.pk::text AND t.${enc} IS NULL + ` + + await db.query(sql, params) +} diff --git a/packages/migrate/src/cursor.ts b/packages/migrate/src/cursor.ts new file mode 100644 index 00000000..79cb5547 --- /dev/null +++ b/packages/migrate/src/cursor.ts @@ -0,0 +1,116 @@ +import type { ClientBase } from 'pg' +import { quoteIdent } from './sql.js' + +/** + * Inputs to {@link fetchUnencryptedPage}. + */ +export interface KeysetPageOptions { + /** Physical table name. Supports `schema.table`. */ + tableName: string + /** + * Primary-key column used for keyset pagination. Must be comparable with + * `>`. Cast to text by the query so any PK type works. + */ + pkColumn: string + /** Column to read the plaintext from, e.g. `email`. */ + plaintextColumn: string + /** + * Target encrypted column, e.g. `email_encrypted`. Rows where this is + * already non-null are skipped (idempotency guard). + */ + encryptedColumn: string + /** + * Exclusive lower bound on `pkColumn`. Set to `null` to start from the + * beginning; pass the `lastPk` of the previous page to continue. + */ + after: string | null + /** Maximum rows returned per call. */ + limit: number +} + +/** + * One page of rows from {@link fetchUnencryptedPage}. `lastPk` is `null` + * only when the page is empty — the caller's completion signal. + */ +export interface KeysetPage> { + rows: Row[] + lastPk: string | null +} + +/** + * Fetch the next page of rows that still need encryption for a given column. + * + * Guards with `plaintext_col IS NOT NULL AND encrypted_col IS NULL` so a + * concurrent backfill or a re-run never re-processes the same row, even if + * the pagination cursor is lost. The ORDER BY + LIMIT form gives keyset + * (seek) pagination rather than OFFSET, which keeps the scan bounded as + * the cursor advances. + */ +export async function fetchUnencryptedPage( + client: ClientBase, + opts: KeysetPageOptions, +): Promise> { + const pk = quoteIdent(opts.pkColumn) + const plain = quoteIdent(opts.plaintextColumn) + const enc = quoteIdent(opts.encryptedColumn) + const table = qualifyTable(opts.tableName) + + const params: unknown[] = [] + let where = `${plain} IS NOT NULL AND ${enc} IS NULL` + if (opts.after !== null) { + params.push(opts.after) + where += ` AND ${pk} > $${params.length}` + } + params.push(opts.limit) + const limitParam = `$${params.length}` + + const sql = ` + SELECT ${pk}::text AS pk, ${plain} AS plaintext + FROM ${table} + WHERE ${where} + ORDER BY ${pk} ASC + LIMIT ${limitParam} + ` + const result = await client.query<{ pk: string; plaintext: unknown }>( + sql, + params, + ) + const rows = result.rows + const lastPk = rows.length > 0 ? rows[rows.length - 1]?.pk : null + return { rows, lastPk } +} + +/** + * Count rows that still need encryption: `plaintext IS NOT NULL AND + * encrypted IS NULL`. Called once at the start of a backfill to compute + * `rowsTotal` for progress reporting; does not hold a snapshot, so new + * rows inserted during the backfill are simply picked up on the next + * chunk's SELECT. + */ +export async function countUnencrypted( + client: ClientBase, + tableName: string, + plaintextColumn: string, + encryptedColumn: string, +): Promise { + const plain = quoteIdent(plaintextColumn) + const enc = quoteIdent(encryptedColumn) + const table = qualifyTable(tableName) + const result = await client.query<{ count: string }>( + `SELECT count(*)::text AS count FROM ${table} WHERE ${plain} IS NOT NULL AND ${enc} IS NULL`, + ) + return Number(result.rows[0]?.count ?? 0) +} + +/** + * Quote a possibly schema-qualified table name for use in a SQL statement. + * `"foo"` → `"foo"`; `"public.foo"` → `"public"."foo"`. Use for identifiers + * that cannot be parameterised. + */ +export function qualifyTable(tableName: string): string { + if (tableName.includes('.')) { + const parts = tableName.split('.') + return parts.map(quoteIdent).join('.') + } + return quoteIdent(tableName) +} diff --git a/packages/migrate/src/eql.ts b/packages/migrate/src/eql.ts new file mode 100644 index 00000000..978939b6 --- /dev/null +++ b/packages/migrate/src/eql.ts @@ -0,0 +1,106 @@ +import type { ClientBase } from 'pg' + +/** + * Thin, typed wrappers around the EQL (Encrypt Query Language) functions + * installed by `stash db install`. These mirror the canonical SQL API that + * CipherStash Proxy also drives, so every action we take here stays + * visible to Proxy using the same column-level config. + * + * Defined by the EQL project at + * https://github.com/cipherstash/encrypt-query-language — see + * `src/config/functions.sql` and `src/encryptindex/functions.sql` for the + * source of truth. + */ + +/** + * A column that has been registered in the `pending` EQL configuration but + * is not yet part of the `active` config. Returned by + * {@link selectPendingColumns}. + */ +export interface PendingColumn { + tableName: string + columnName: string +} + +/** + * Return columns present in the `pending` EQL config but absent (or + * different) in the `active` one. Wraps `eql_v2.select_pending_columns()`. + * Useful for showing "what's about to change" before calling + * {@link readyForEncryption} + activating the pending config. + */ +export async function selectPendingColumns( + client: ClientBase, +): Promise { + const result = await client.query<{ + table_name: string + column_name: string + }>('SELECT table_name, column_name FROM eql_v2.select_pending_columns()') + return result.rows.map((row) => ({ + tableName: row.table_name, + columnName: row.column_name, + })) +} + +/** + * Check EQL's precondition for activating a pending configuration: every + * pending column must have a matching `eql_v2_encrypted`-typed target + * column in the schema. Returns `true` if activation is safe. + * Wraps `eql_v2.ready_for_encryption()`. + */ +export async function readyForEncryption(client: ClientBase): Promise { + const result = await client.query<{ ready: boolean }>( + 'SELECT eql_v2.ready_for_encryption() AS ready', + ) + return result.rows[0]?.ready === true +} + +/** + * Atomically rename every `` → `_plaintext` and + * `_encrypted` → `` across tables in the active EQL config. + * Wraps `eql_v2.rename_encrypted_columns()`. + * + * This is the **cut-over primitive**: after this returns, any SQL that + * reads `` transparently receives the encrypted column (decrypted on + * read by Proxy or Protect). Call inside a transaction. + * + * Idempotency: the underlying EQL function is safe to call when no renames + * are pending; it simply returns without changes. + */ +export async function renameEncryptedColumns( + client: ClientBase, +): Promise { + await client.query('SELECT eql_v2.rename_encrypted_columns()') +} + +/** + * Nudge Proxy to re-read its config immediately instead of waiting for its + * next 60-second refresh tick. Wraps `eql_v2.reload_config()`. + * + * **Must be executed through a CipherStash Proxy connection** — when + * connected directly to Postgres, `reload_config()` is a no-op (by design, + * per the EQL documentation). The CLI's `cutover` command accepts a + * `--proxy-url` flag and will connect to that separately to issue this. + */ +export async function reloadConfig(client: ClientBase): Promise { + await client.query('SELECT eql_v2.reload_config()') +} + +/** + * Return EQL's count of rows in `.` whose encrypted + * payload's config version matches the currently active config. Useful as + * a cheap sanity check — 0 after a backfill generally means something's + * wrong (wrong config active, or the backfill wrote with a stale version). + * + * Wraps `eql_v2.count_encrypted_with_active_config(table, column)`. + */ +export async function countEncryptedWithActiveConfig( + client: ClientBase, + tableName: string, + columnName: string, +): Promise { + const result = await client.query<{ count: string }>( + 'SELECT eql_v2.count_encrypted_with_active_config($1, $2) AS count', + [tableName, columnName], + ) + return Number(result.rows[0]?.count ?? 0) +} diff --git a/packages/migrate/src/index.ts b/packages/migrate/src/index.ts new file mode 100644 index 00000000..e0375424 --- /dev/null +++ b/packages/migrate/src/index.ts @@ -0,0 +1,64 @@ +/** + * `@cipherstash/migrate` — primitives for migrating existing plaintext + * columns to `eql_v2_encrypted` in production Postgres databases. + * + * Powers the `stash encrypt` CLI command group, and is usable directly + * from a user's own worker/cron when they'd rather not pipe gigabytes + * through a CLI process. + * + * Per-column lifecycle: + * + * ``` + * schema-added → dual-writing → backfilling → backfilled → cut-over → dropped + * ``` + * + * State is split across three stores on purpose: + * - `.cipherstash/migrations.json` — repo-side intent ({@link Manifest}) + * - `eql_v2_configuration` — EQL intent (unchanged; Proxy's source of truth) + * - `cipherstash.cs_migrations` — append-only runtime state written here + * + * The primary entry point is {@link runBackfill}. The state DAO + * ({@link appendEvent}, {@link latestByColumn}, {@link progress}) lets you + * build your own UI on top of the same tracking table. + * + * @packageDocumentation + */ + +export { installMigrationsSchema, MIGRATIONS_SCHEMA_SQL } from './install.js' +export { + appendEvent, + latestByColumn, + progress, + type MigrationEvent, + type MigrationPhase, + type MigrationStateRow, + type ColumnKey, +} from './state.js' +export { + selectPendingColumns, + readyForEncryption, + renameEncryptedColumns, + reloadConfig, + countEncryptedWithActiveConfig, +} from './eql.js' +export { + fetchUnencryptedPage, + countUnencrypted, + qualifyTable, + type KeysetPage, + type KeysetPageOptions, +} from './cursor.js' +export { quoteIdent } from './sql.js' +export { + runBackfill, + type BackfillOptions, + type BackfillProgress, + type BackfillResult, +} from './backfill.js' +export { + readManifest, + writeManifest, + manifestPath, + type Manifest, + type ManifestColumn, +} from './manifest.js' diff --git a/packages/migrate/src/install.ts b/packages/migrate/src/install.ts new file mode 100644 index 00000000..8bcd7945 --- /dev/null +++ b/packages/migrate/src/install.ts @@ -0,0 +1,53 @@ +import type { ClientBase } from 'pg' + +/** + * DDL for `cipherstash.cs_migrations` — the append-only per-column event + * log that tracks encryption-migration runtime state (phase, backfill + * cursor, rows processed). Installed by `stash db install`. + * + * All statements are `CREATE … IF NOT EXISTS` so running the installer + * multiple times or alongside an existing deployment is safe. + * + * This table is intentionally kept separate from `eql_v2_configuration`: + * - That table's `data` JSONB has a strict CHECK constraint that forbids + * custom metadata, so we cannot stuff backfill progress into it. + * - Its `state` enum is global (`pending`/`encrypting`/`active`/`inactive` + * — only one of the first three at a time), which cannot represent + * multiple columns in different phases simultaneously. + * - Checkpoint writes during backfill would collide with Proxy's 60s + * config refresh cycle. + */ +export const MIGRATIONS_SCHEMA_SQL = ` +CREATE SCHEMA IF NOT EXISTS cipherstash; + +CREATE TABLE IF NOT EXISTS cipherstash.cs_migrations ( + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + table_name text NOT NULL, + column_name text NOT NULL, + event text NOT NULL, + phase text NOT NULL, + cursor_value text, + rows_processed bigint, + rows_total bigint, + details jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS cs_migrations_column_id_desc + ON cipherstash.cs_migrations (table_name, column_name, id DESC); +` + +/** + * Create the `cipherstash` schema and `cs_migrations` table if they do not + * already exist. Safe to call on every `stash db install` invocation. + * + * Requires `CREATE SCHEMA` privileges on the database. If the caller lacks + * them, the query will fail and the error bubbles up — the CLI currently + * warns but does not abort, on the theory that a human DBA can install + * this schema out-of-band using `MIGRATIONS_SCHEMA_SQL` directly. + */ +export async function installMigrationsSchema( + client: ClientBase, +): Promise { + await client.query(MIGRATIONS_SCHEMA_SQL) +} diff --git a/packages/migrate/src/manifest.ts b/packages/migrate/src/manifest.ts new file mode 100644 index 00000000..2998db0e --- /dev/null +++ b/packages/migrate/src/manifest.ts @@ -0,0 +1,101 @@ +import * as fs from 'node:fs/promises' +import * as path from 'node:path' +import { z } from 'zod' + +/** + * The four EQL index kinds recognised by Proxy. Keep in sync with the + * `indexes` CHECK constraint in `eql_v2_configuration`. + */ +const IndexKind = z.enum(['unique', 'match', 'ore', 'ste_vec']) + +/** + * Intent for a single column within the manifest. Expresses *what the user + * wants the state of this column to be*, not the observed reality — the + * `status` / `plan` commands diff this against `cs_migrations` and EQL to + * surface drift. + */ +const ManifestColumnSchema = z.object({ + /** Physical column name, e.g. `email`. */ + column: z.string(), + /** + * EQL cast type. Text by default. See the EQL docs for the full list + * (`text | int | small_int | big_int | real | double | boolean | date | + * jsonb | json | float | decimal | timestamp`). + */ + castAs: z.string().default('text'), + /** Desired EQL index set. Driver of the `indexes: {…}` block in EQL config. */ + indexes: z.array(IndexKind).default([]), + /** + * The phase the user wants this column to reach. `cut-over` is the + * typical end state (reads transparently decrypted); advance to + * `dropped` only once you're confident the plaintext column is no + * longer needed. + */ + targetPhase: z + .enum(['schema-added', 'dual-writing', 'backfilled', 'cut-over', 'dropped']) + .default('cut-over'), + /** + * Override for primary-key detection during backfill. Omit to let the + * CLI auto-detect via `information_schema`. + */ + pkColumn: z.string().optional(), +}) + +/** + * Root manifest shape. Stored at `.cipherstash/migrations.json`. Versioned + * (currently `1`) so we can evolve it without breaking existing manifests. + */ +const ManifestSchema = z.object({ + version: z.literal(1).default(1), + /** Map of table name → array of column intents for that table. */ + tables: z.record(z.array(ManifestColumnSchema)), +}) + +export type Manifest = z.infer +export type ManifestColumn = z.infer + +/** Canonical on-disk location for the manifest. */ +export function manifestPath(cwd: string = process.cwd()): string { + return path.join(cwd, '.cipherstash', 'migrations.json') +} + +/** + * Read and validate the manifest. Returns `null` when no manifest file + * exists (this is not an error — most commands still work without one; + * they just can't show intent-vs-observed drift). + * + * Throws on schema validation failures (zod errors). + */ +export async function readManifest( + cwd: string = process.cwd(), +): Promise { + const filePath = manifestPath(cwd) + let raw: string + try { + raw = await fs.readFile(filePath, 'utf-8') + } catch (err) { + if ((err as NodeJS.ErrnoException).code === 'ENOENT') return null + throw err + } + const parsed = ManifestSchema.parse(JSON.parse(raw)) + return parsed +} + +/** + * Validate and write the manifest to `.cipherstash/migrations.json`, + * creating the directory if it doesn't exist. Rewrites the file + * atomically-enough for config purposes; not safe under concurrent writers. + */ +export async function writeManifest( + manifest: Manifest, + cwd: string = process.cwd(), +): Promise { + const filePath = manifestPath(cwd) + await fs.mkdir(path.dirname(filePath), { recursive: true }) + const validated = ManifestSchema.parse(manifest) + await fs.writeFile( + filePath, + `${JSON.stringify(validated, null, 2)}\n`, + 'utf-8', + ) +} diff --git a/packages/migrate/src/sql.ts b/packages/migrate/src/sql.ts new file mode 100644 index 00000000..2c5090f4 --- /dev/null +++ b/packages/migrate/src/sql.ts @@ -0,0 +1,7 @@ +/** + * Quote a PostgreSQL identifier. Doubles any embedded `"` and wraps in `"`. + * Use for table/column names that cannot be parameterised. + */ +export function quoteIdent(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"` +} diff --git a/packages/migrate/src/state.ts b/packages/migrate/src/state.ts new file mode 100644 index 00000000..b1eb2e2f --- /dev/null +++ b/packages/migrate/src/state.ts @@ -0,0 +1,201 @@ +import type { ClientBase } from 'pg' + +/** + * Discrete event types written to the `cipherstash.cs_migrations` event log. + * + * Events are snake_case (for SQL readability); phases are kebab-case (for + * user-facing CLI output). Most events correspond 1:1 with a phase, except: + * - `backfill_started` and `backfill_checkpoint` both live inside phase + * `backfilling`; the `_started` event records the initial intent (and + * whether we resumed), while each `_checkpoint` records a committed chunk. + * - `error` records a failure at whatever phase was current; it does not + * change the effective phase (so a retry resumes from where it failed). + */ +export type MigrationEvent = + | 'schema_added' + | 'dual_writing' + | 'backfill_started' + | 'backfill_checkpoint' + | 'backfilled' + | 'cut_over' + | 'dropped' + | 'error' + +/** + * The per-column lifecycle phase as surfaced in status/plan output and + * accepted by `stash encrypt advance --to `. + * + * ``` + * schema-added → the _encrypted column exists and is registered with EQL + * dual-writing → app writes both plaintext and encrypted on inserts/updates + * backfilling → runBackfill is (or has been) encrypting historical rows + * backfilled → all historical rows encrypted; safe to cut over reads + * cut-over → columns renamed (via eql_v2.rename_encrypted_columns) + * dropped → old plaintext column removed + * ``` + */ +export type MigrationPhase = + | 'schema-added' + | 'dual-writing' + | 'backfilling' + | 'backfilled' + | 'cut-over' + | 'dropped' + +/** + * Composite key of `
.`. Used as the map key by + * {@link latestByColumn}. + */ +export type ColumnKey = `${string}.${string}` + +/** + * A single row from `cipherstash.cs_migrations`, decoded with numeric bigints + * converted to `number` and column names camel-cased. `id` is a string to + * avoid JavaScript bigint precision loss for very large tables. + */ +export interface MigrationStateRow { + /** Row id, stringified from the bigint column. Monotonically increasing. */ + id: string + tableName: string + columnName: string + event: MigrationEvent + /** Effective phase *after* this event. */ + phase: MigrationPhase + /** + * Value of `pkColumn` for the last row processed in the most recent + * chunk. Set on `backfill_checkpoint` and `backfilled`; `null` on other + * event types. Stored as text so it works for any PK type. + */ + cursorValue: string | null + /** Cumulative rows encrypted. `null` on non-backfill events. */ + rowsProcessed: number | null + /** Target rows for this migration. `null` on non-backfill events. */ + rowsTotal: number | null + /** + * Free-form event-specific metadata. Examples: `{ chunkSize, resumed }` + * on `backfill_started`; `{ message, chunkIndex }` on `error`. + */ + details: Record | null + createdAt: Date +} + +/** + * Input to {@link appendEvent}. All fields other than `tableName`, + * `columnName`, `event`, and `phase` are optional and stored as `NULL` when + * omitted. + */ +export interface AppendEventInput { + tableName: string + columnName: string + event: MigrationEvent + phase: MigrationPhase + cursorValue?: string | null + rowsProcessed?: number | null + rowsTotal?: number | null + details?: Record | null +} + +/** + * Append a new event row to `cipherstash.cs_migrations`. The table is + * append-only — existing rows are never updated, so history is preserved + * and concurrent writers never clobber each other. + * + * The "current state" of a column is derived by selecting the row with the + * greatest `id` for that `(tableName, columnName)` pair. + */ +export async function appendEvent( + client: ClientBase, + input: AppendEventInput, +): Promise { + const result = await client.query( + `INSERT INTO cipherstash.cs_migrations + (table_name, column_name, event, phase, cursor_value, rows_processed, rows_total, details) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id, table_name, column_name, event, phase, cursor_value, rows_processed, rows_total, details, created_at`, + [ + input.tableName, + input.columnName, + input.event, + input.phase, + input.cursorValue ?? null, + input.rowsProcessed ?? null, + input.rowsTotal ?? null, + input.details ?? null, + ], + ) + return rowToState(result.rows[0]) +} + +/** + * Return the most recent event row for every column tracked in + * `cs_migrations`, keyed by `"."`. Used by + * `stash encrypt status` and `plan` to render a table view. + * + * Columns with no recorded events are simply absent from the map — there + * is no synthetic "nothing happened yet" entry. + */ +export async function latestByColumn( + client: ClientBase, +): Promise> { + const result = await client.query( + `SELECT DISTINCT ON (table_name, column_name) + id, table_name, column_name, event, phase, cursor_value, rows_processed, rows_total, details, created_at + FROM cipherstash.cs_migrations + ORDER BY table_name, column_name, id DESC`, + ) + const map = new Map() + for (const row of result.rows) { + const state = rowToState(row) + map.set(`${state.tableName}.${state.columnName}`, state) + } + return map +} + +/** + * Latest event row for a single column, or `null` if the column has no + * recorded events. Used by the backfill runner to determine whether it is + * resuming a checkpoint or starting fresh. + */ +export async function progress( + client: ClientBase, + tableName: string, + columnName: string, +): Promise { + const result = await client.query( + `SELECT id, table_name, column_name, event, phase, cursor_value, rows_processed, rows_total, details, created_at + FROM cipherstash.cs_migrations + WHERE table_name = $1 AND column_name = $2 + ORDER BY id DESC + LIMIT 1`, + [tableName, columnName], + ) + if (result.rows.length === 0) return null + return rowToState(result.rows[0]) +} + +function rowToState(row: { + id: string | number + table_name: string + column_name: string + event: MigrationEvent + phase: MigrationPhase + cursor_value: string | null + rows_processed: string | number | null + rows_total: string | number | null + details: Record | null + created_at: Date +}): MigrationStateRow { + return { + id: String(row.id), + tableName: row.table_name, + columnName: row.column_name, + event: row.event, + phase: row.phase, + cursorValue: row.cursor_value, + rowsProcessed: + row.rows_processed === null ? null : Number(row.rows_processed), + rowsTotal: row.rows_total === null ? null : Number(row.rows_total), + details: row.details, + createdAt: row.created_at, + } +} diff --git a/packages/migrate/tsconfig.json b/packages/migrate/tsconfig.json new file mode 100644 index 00000000..982c0785 --- /dev/null +++ b/packages/migrate/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "lib": ["ES2022"], + "target": "ES2022", + "module": "ESNext", + "moduleDetection": "force", + "allowJs": true, + "esModuleInterop": true, + + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + "paths": { + "@/*": ["./src/*"] + } + } +} diff --git a/packages/migrate/tsup.config.ts b/packages/migrate/tsup.config.ts new file mode 100644 index 00000000..16e3080c --- /dev/null +++ b/packages/migrate/tsup.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from 'tsup' + +export default defineConfig([ + { + entry: ['src/index.ts'], + format: ['cjs', 'esm'], + sourcemap: true, + dts: true, + clean: true, + target: 'es2022', + tsconfig: './tsconfig.json', + external: ['pg', '@cipherstash/stack'], + }, +]) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ead9e3b6..970d4d6a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -76,6 +76,9 @@ importers: '@cipherstash/auth': specifier: catalog:repo version: 0.36.0 + '@cipherstash/migrate': + specifier: workspace:* + version: link:../migrate '@clack/prompts': specifier: 0.10.1 version: 0.10.1 @@ -157,6 +160,31 @@ importers: specifier: catalog:repo version: 3.1.3(@types/node@22.19.3)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.1)(tsx@4.19.3) + packages/migrate: + dependencies: + zod: + specifier: ^3.24.2 + version: 3.24.2 + devDependencies: + '@cipherstash/stack': + specifier: workspace:* + version: link:../stack + '@types/pg': + specifier: ^8.11.11 + version: 8.16.0 + pg: + specifier: 8.13.1 + version: 8.13.1 + tsup: + specifier: catalog:repo + version: 8.4.0(jiti@2.6.1)(postcss@8.5.6)(tsx@4.19.3)(typescript@5.6.3) + typescript: + specifier: catalog:repo + version: 5.6.3 + vitest: + specifier: catalog:repo + version: 3.1.3(@types/node@22.19.3)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.1)(tsx@4.19.3) + packages/nextjs: dependencies: jose: @@ -165,6 +193,10 @@ importers: next: specifier: ^14 || ^15 version: 15.5.10(react-dom@19.2.3(react@19.2.3))(react@19.2.3) + optionalDependencies: + '@rollup/rollup-linux-x64-gnu': + specifier: 4.24.0 + version: 4.24.0 devDependencies: '@clerk/nextjs': specifier: catalog:security @@ -181,10 +213,6 @@ importers: vitest: specifier: catalog:repo version: 3.1.3(@types/node@22.19.3)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.1)(tsx@4.19.3) - optionalDependencies: - '@rollup/rollup-linux-x64-gnu': - specifier: 4.24.0 - version: 4.24.0 packages/protect: dependencies: @@ -206,6 +234,10 @@ importers: zod: specifier: ^3.24.2 version: 3.25.76 + optionalDependencies: + '@rollup/rollup-linux-x64-gnu': + specifier: 4.24.0 + version: 4.24.0 devDependencies: '@supabase/supabase-js': specifier: ^2.47.10 @@ -231,10 +263,6 @@ importers: vitest: specifier: catalog:repo version: 3.1.3(@types/node@22.19.3)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.1)(tsx@4.19.3) - optionalDependencies: - '@rollup/rollup-linux-x64-gnu': - specifier: 4.24.0 - version: 4.24.0 packages/protect-dynamodb: dependencies: