Skip to content

feat: add IngestSync for completion-aware ingest#19

Merged
acoshift merged 1 commit into
mainfrom
feat/ingest-sync
Jun 13, 2026
Merged

feat: add IngestSync for completion-aware ingest#19
acoshift merged 1 commit into
mainfrom
feat/ingest-sync

Conversation

@acoshift

Copy link
Copy Markdown
Member

Problem

Ingest() is fire-and-forget into an in-memory buffer — the HTTP POST to Quickwit happens later in a background worker. A caller consuming a pub/sub subscription has no signal for when the data is durably on the server, so it cannot decide when to Ack():

  • Ack right after Ingest() returns → a crash before the worker flushes loses the data (the message was acked, never redelivered).
  • Never ack → the subscription stalls and redelivers.

Exactly-once is impossible here (two-generals: there's always a window between the server's 200 and the broker's Ack). The achievable, honest target is at-least-once with caller-side dedup.

Change

Add a completion-aware ingest API that threads a per-item handle through the existing batch pipeline (so batching, 413 auto-reduce, gzip, and connection pooling are all reused):

err := c.IngestSync(ctx, doc)   // blocks until durably accepted, or ctx/drop
switch {
case err == nil:                          msg.Ack()                  // HTTP 200
case isReason(err, quickwit.ReasonEncode): deadLetter(msg); msg.Ack() // poison
default:                                   msg.Nack()                 // transient / not-confirmed
}

New public surface (purely additive; Ingest behavior unchanged):

  • IngestSync(ctx, data...) error — blocking; returns nil on HTTP 200, *IngestError{Reason} on a terminal drop, or a wrapped ctx error meaning "not confirmed".
  • IngestBatch(data...) *IngestReceipt + Wait(ctx)/Done() — enqueue-now / wait-later.
  • IngestError, DiscardReason (ReasonEncode / ReasonBufferFull / ReasonClosed).

How it works

  • ingestBuffer is now chan ingestItem{data, raw, ack}. Fire-and-forget keeps ack==nil, so every settle site is a no-op and the hot path stays 0-alloc (+~12ns for the close guard, invisible behind the POST).
  • Tracked items are JSON-encoded in the caller's goroutine (synchronous encode errors; the worker never re-encodes). settle(nil) on HTTP 200 is the only success site, and a 200 always drops the chunk from the retry buffer — so no item is ever settled twice, including across the 413 oversize split.
  • enqueue is ctx-aware and guarded by an RWMutex so a send cannot race Close()'s channel close and panic. Close() settles undeliverable buffered items with ReasonClosed rather than leaving waiters hanging.
  • Encode failures now settle/discard at detection rather than after a 200.

Caller contract

At-least-once at the server-200 boundary. The caller must attach a deterministic doc id and dedup (Quickwit has no native dedup). Pass a deadline ctx inside the subscription's ack-deadline — IngestSync never creates its own timeout. For low-volume topics, lower SetMaxDelay so a lone item isn't held for the flush interval before its first POST (the flush ticker becomes part of Ack latency). Prefer one document per call for a clean message↔verdict mapping.

Tests

10 new tests (ingest_sync_test.go): server-accepts, encode-poison (0 requests sent), ctx-deadline-while-failing, after-close, 413 split settles all items, concurrent calls are batched, close-while-failing settles ReasonClosed, batch receipt, empty/pre-cancelled ctx. Full suite (32 tests) green, go vet clean, race detector clean across repeated runs.

🤖 Generated with Claude Code

Ingest() is fire-and-forget into an in-memory buffer, so a caller (e.g. a
pub/sub subscription handler) has no signal for when the data is durably on
the server and cannot decide when to Ack: Ack too early and a crash loses
data; never Ack and the subscription stalls.

Add IngestSync(ctx, data...) error and IngestBatch(...) *IngestReceipt that
thread a per-item completion handle (ackBatch) through the existing batch
pipeline. The worker settles each item exactly once — nil on HTTP 200, an
*IngestError on a terminal drop (ReasonEncode / ReasonBufferFull /
ReasonClosed). A wrapped context error means "not confirmed" (still in
flight). Callers map nil->Ack, *IngestError->Ack-and-dead-letter or Nack,
ctx-error->Nack, and attach a dedup id (delivery is at-least-once).

Mechanics:
- ingestBuffer is now chan ingestItem{data, raw, ack}; fire-and-forget keeps
  ack=nil so every settle site is a no-op and the hot path stays 0-alloc.
- Tracked items are JSON-encoded in the caller's goroutine (synchronous
  encode errors; worker never re-encodes), so settle(nil) on 200 is the only
  success site and a 200 always drops the chunk from the retry buffer — no
  item is ever settled twice, including across the 413 oversize split.
- enqueue is ctx-aware and guarded by an RWMutex so a send cannot race
  Close()'s channel close and panic; Close settles any undeliverable buffered
  items with ReasonClosed instead of leaving waiters hanging.
- Encode failures now settle/discard at detection rather than after 200.

IngestSync takes a caller-supplied deadline ctx (never creates its own
timeout); for low-volume topics, lower SetMaxDelay so a lone item is not held
for the flush interval before its first POST.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@acoshift acoshift merged commit e6e8484 into main Jun 13, 2026
1 check passed
@acoshift acoshift deleted the feat/ingest-sync branch June 13, 2026 05:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant