Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .server-changes/otel-attribute-utf16-sanitization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
area: webapp
type: fix
---

Recover from ClickHouse `JSONEachRow` parse failures caused by lone
UTF-16 surrogates in OTel attribute strings (`Cannot parse JSON object
here ... ParallelParsingBlockInputFormat`).

`ClickhouseEventRepository.#flushBatch` and `#flushLlmMetricsBatch` now
retry once after sanitizing every row in the batch: any string value
containing a lone surrogate is replaced with `"[invalid-utf16]"`. If
the sanitizer touched no fields (the parse error isn't a surrogate
issue) or the retry still fails, the batch is dropped without further
ClickHouse round-trips, `permanentlyDroppedBatches` increments, and an
error log with a 1KB sample row is emitted. Non-parse errors propagate
unchanged.

Detection reuses `detectBadJsonStrings` via `JSON.stringify(value)`,
with a latent regex bug fixed: the low-surrogate hex nibble matched
`[cd]` instead of `[c-f]`, missing the U+DE00–U+DFFF half of the range
and false-flagging common emoji pairs. Healthy batches pay zero scan
cost — the check only runs when ClickHouse has already rejected.
22 changes: 18 additions & 4 deletions apps/webapp/app/utils/detectBadJsonStrings.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/**
* Detects unpaired UTF-16 surrogate escape sequences in JSON-encoded text.
*
* Returns true if the input contains a `\uD8XX`/`\uD9XX`/`\uDAXX`/`\uDBXX`
* high-surrogate escape not immediately followed by a `\uDC..`–`\uDF..` low
* surrogate, or a `\uDC..`–`\uDF..` low surrogate not immediately preceded by
* a high surrogate. Strict JSON parsers (e.g. ClickHouse `JSONEachRow`)
* reject input containing such sequences.
*
* Surrogate hex ranges (case-insensitive — inputs from `JSON.stringify` are
* lowercase):
* - High surrogate (U+D800–U+DBFF): `\uD[8-B][0-9A-F][0-9A-F]`
* - Low surrogate (U+DC00–U+DFFF): `\uD[C-F][0-9A-F][0-9A-F]`
*/
export function detectBadJsonStrings(jsonString: string): boolean {
// Fast path: skip everything if no \u
let idx = jsonString.indexOf("\\u");
Expand All @@ -13,7 +27,7 @@ export function detectBadJsonStrings(jsonString: string): boolean {
if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") {
const third = jsonString[idx + 3];

// High surrogate check
// High surrogate check — third nibble is 8, 9, a, or b (U+D800–U+DBFF)
if (
/[89ab]/.test(third) &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
Expand All @@ -28,17 +42,17 @@ export function detectBadJsonStrings(jsonString: string): boolean {
jsonString[idx + 6] !== "\\" ||
jsonString[idx + 7] !== "u" ||
jsonString[idx + 8] !== "d" ||
!/[cd]/.test(jsonString[idx + 9]) ||
!/[c-f]/.test(jsonString[idx + 9]) ||
!/[0-9a-f]/.test(jsonString[idx + 10]) ||
!/[0-9a-f]/.test(jsonString[idx + 11])
) {
return true; // Incomplete high surrogate
}
}

// Low surrogate check
// Low surrogate check — third nibble is c, d, e, or f (U+DC00–U+DFFF)
if (
(third === "c" || third === "d") &&
/[c-f]/.test(third) &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
/[0-9a-f]/.test(jsonString[idx + 5])
) {
Expand Down
172 changes: 157 additions & 15 deletions apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ import {
removePrivateProperties,
isEmptyObject,
} from "./common.server";
import {
isClickHouseJsonParseError,
parseRowNumberFromError,
sanitizeRows,
} from "./sanitizeRowsOnParseError.server";
import type {
CompleteableTaskRun,
CreateEventInput,
Expand Down Expand Up @@ -104,6 +109,13 @@ export class ClickhouseEventRepository implements IEventRepository {
private readonly _llmMetricsFlushScheduler: DynamicFlushScheduler<LlmMetricsV1Input>;
private _tracer: Tracer;
private _version: "v1" | "v2";
/**
* Counts batches that hit a ClickHouse JSON parse failure that survived
* one sanitize-retry. These batches are dropped on the floor (the scheduler
* is told the flush "succeeded" so its queue counter doesn't leak), and we
* track the drop count for observability.
*/
private _permanentlyDroppedBatches = 0;

constructor(config: ClickhouseEventRepositoryConfig) {
this._clickhouse = config.clickhouse;
Expand Down Expand Up @@ -147,6 +159,11 @@ export class ClickhouseEventRepository implements IEventRepository {
return this._config.maximumLiveReloadingSetting ?? 1000;
}

/** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
get permanentlyDroppedBatches() {
return this._permanentlyDroppedBatches;
}

/**
* Clamps a start time (in nanoseconds) to now if it's too far in the past.
* Returns the clamped value as a bigint.
Expand Down Expand Up @@ -215,19 +232,32 @@ export class ClickhouseEventRepository implements IEventRepository {
? this._clickhouse.taskEventsV2.insert
: this._clickhouse.taskEvents.insert;

const [insertError, insertResult] = await insertFn(events, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});
const doInsert = async () => {
const [insertError, insertResult] = await insertFn(events, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});
if (insertError) throw insertError;
return insertResult;
};

const outcome = await this.#insertWithJsonParseRecovery(
flushId,
events,
doInsert,
`task_events_${this._version}`
);

if (insertError) {
throw insertError;
if (outcome.kind === "dropped") {
// Loud log already emitted; nothing landed in ClickHouse — don't publish to Redis.
return;
}

logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", {
events: events.length,
insertResult,
insertResult: outcome.insertResult,
sanitized: outcome.kind === "sanitized",
version: this._version,
});

Expand All @@ -236,22 +266,134 @@ export class ClickhouseEventRepository implements IEventRepository {
}

async #flushLlmMetricsBatch(flushId: string, rows: LlmMetricsV1Input[]) {
const doInsert = async () => {
const [insertError, insertResult] = await this._clickhouse.llmMetrics.insert(rows, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});
if (insertError) throw insertError;
return insertResult;
};

const [insertError] = await this._clickhouse.llmMetrics.insert(rows, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});
const outcome = await this.#insertWithJsonParseRecovery(
flushId,
rows,
doInsert,
"llm_metrics_v1"
);

if (insertError) {
throw insertError;
if (outcome.kind === "dropped") {
return;
}

logger.info("ClickhouseEventRepository.flushLlmMetricsBatch Inserted LLM metrics batch", {
rows: rows.length,
sanitized: outcome.kind === "sanitized",
});
}

/**
* Wraps a ClickHouse insert callable with reactive UTF-16 sanitization.
*
* On a `Cannot parse JSON object` failure:
* 1. Sanitize the batch from `max(0, parsedRowN - 1)` onwards (rows
* before the failing one parsed fine — known good).
* 2. Retry the insert once with the sanitized batch.
* 3. If the retry still fails with the same error class, log loudly,
* increment `permanentlyDroppedBatches`, and return without
* throwing — the scheduler's transient-retry path would just repeat
* the same deterministic failure.
*
* Non-parse errors propagate unchanged so the scheduler's existing
* backoff/retry behaviour still handles transient network or CH issues.
*/
async #insertWithJsonParseRecovery<T extends object>(
flushId: string,
rows: T[],
doInsert: () => Promise<unknown>,
contextLabel: string
): Promise<
| { kind: "inserted"; insertResult: unknown }
| { kind: "sanitized"; insertResult: unknown }
| { kind: "dropped" }
> {
try {
return { kind: "inserted", insertResult: await doInsert() };
} catch (firstError) {
if (!isClickHouseJsonParseError(firstError)) throw firstError;

const firstMessage =
typeof firstError === "object" && firstError !== null && "message" in firstError
? String((firstError as { message?: unknown }).message ?? "")
: String(firstError);

// Sanitize the whole batch. ClickHouse's `at row N` index is logged
// for observability but not used to slice — its semantics under
// parallel parsing are not stable enough to safely skip rows.
const rowHint = parseRowNumberFromError(firstMessage);
const { rowsTouched, fieldsSanitized } = sanitizeRows(rows);

// Sanitizer found nothing to fix → retrying the exact same batch is
// guaranteed to hit the same deterministic parse failure. Skip the
// wasted ClickHouse round-trip and drop loudly. Throwing instead would
// hand the failure back to the scheduler's 3× transient-retry loop —
// exactly the retry storm this wrapper is designed to avoid.
if (fieldsSanitized === 0) {
this._permanentlyDroppedBatches += 1;
logger.error(
"Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix",
{
flushId,
contextLabel,
batchSize: rows.length,
clickhouseRowHint: rowHint,
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
clickhouseError: firstMessage.split("\n")[0],
}
);
return { kind: "dropped" };
}

logger.warn("Sanitizing batch after ClickHouse JSON parse error", {
flushId,
contextLabel,
batchSize: rows.length,
clickhouseRowHint: rowHint,
rowsTouched,
fieldsSanitized,
clickhouseError: firstMessage.split("\n")[0],
});

try {
return { kind: "sanitized", insertResult: await doInsert() };
} catch (retryError) {
if (!isClickHouseJsonParseError(retryError)) throw retryError;

this._permanentlyDroppedBatches += 1;
const retryMessage =
typeof retryError === "object" && retryError !== null && "message" in retryError
? String((retryError as { message?: unknown }).message ?? "")
: String(retryError);
logger.error(
"Dropped batch after sanitize-retry still hit ClickHouse JSON parse error",
{
flushId,
contextLabel,
batchSize: rows.length,
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
firstError: firstMessage.split("\n")[0],
retryError: retryMessage.split("\n")[0],
}
);

return { kind: "dropped" };
Comment thread
0ski marked this conversation as resolved.
}
}
}

#createLlmMetricsInput(event: CreateEventInput): LlmMetricsV1Input {
const llmMetrics = event._llmMetrics!;

Expand Down
Loading