feat: add IngestSync for completion-aware ingest#19
Merged
Conversation
Ingest() is fire-and-forget into an in-memory buffer, so a caller (e.g. a
pub/sub subscription handler) has no signal for when the data is durably on
the server and cannot decide when to Ack: Ack too early and a crash loses
data; never Ack and the subscription stalls.
Add IngestSync(ctx, data...) error and IngestBatch(...) *IngestReceipt that
thread a per-item completion handle (ackBatch) through the existing batch
pipeline. The worker settles each item exactly once — nil on HTTP 200, an
*IngestError on a terminal drop (ReasonEncode / ReasonBufferFull /
ReasonClosed). A wrapped context error means "not confirmed" (still in
flight). Callers map nil->Ack, *IngestError->Ack-and-dead-letter or Nack,
ctx-error->Nack, and attach a dedup id (delivery is at-least-once).
Mechanics:
- ingestBuffer is now chan ingestItem{data, raw, ack}; fire-and-forget keeps
ack=nil so every settle site is a no-op and the hot path stays 0-alloc.
- Tracked items are JSON-encoded in the caller's goroutine (synchronous
encode errors; worker never re-encodes), so settle(nil) on 200 is the only
success site and a 200 always drops the chunk from the retry buffer — no
item is ever settled twice, including across the 413 oversize split.
- enqueue is ctx-aware and guarded by an RWMutex so a send cannot race
Close()'s channel close and panic; Close settles any undeliverable buffered
items with ReasonClosed instead of leaving waiters hanging.
- Encode failures now settle/discard at detection rather than after 200.
IngestSync takes a caller-supplied deadline ctx (never creates its own
timeout); for low-volume topics, lower SetMaxDelay so a lone item is not held
for the flush interval before its first POST.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Ingest()is fire-and-forget into an in-memory buffer — the HTTP POST to Quickwit happens later in a background worker. A caller consuming a pub/sub subscription has no signal for when the data is durably on the server, so it cannot decide when toAck():Ingest()returns → a crash before the worker flushes loses the data (the message was acked, never redelivered).Exactly-once is impossible here (two-generals: there's always a window between the server's 200 and the broker's Ack). The achievable, honest target is at-least-once with caller-side dedup.
Change
Add a completion-aware ingest API that threads a per-item handle through the existing batch pipeline (so batching, 413 auto-reduce, gzip, and connection pooling are all reused):
New public surface (purely additive;
Ingestbehavior unchanged):IngestSync(ctx, data...) error— blocking; returnsnilon HTTP 200,*IngestError{Reason}on a terminal drop, or a wrapped ctx error meaning "not confirmed".IngestBatch(data...) *IngestReceipt+Wait(ctx)/Done()— enqueue-now / wait-later.IngestError,DiscardReason(ReasonEncode/ReasonBufferFull/ReasonClosed).How it works
ingestBufferis nowchan ingestItem{data, raw, ack}. Fire-and-forget keepsack==nil, so every settle site is a no-op and the hot path stays 0-alloc (+~12ns for the close guard, invisible behind the POST).settle(nil)on HTTP 200 is the only success site, and a 200 always drops the chunk from the retry buffer — so no item is ever settled twice, including across the 413 oversize split.enqueueis ctx-aware and guarded by anRWMutexso a send cannot raceClose()'s channel close and panic.Close()settles undeliverable buffered items withReasonClosedrather than leaving waiters hanging.Caller contract
At-least-once at the server-200 boundary. The caller must attach a deterministic doc id and dedup (Quickwit has no native dedup). Pass a deadline ctx inside the subscription's ack-deadline —
IngestSyncnever creates its own timeout. For low-volume topics, lowerSetMaxDelayso a lone item isn't held for the flush interval before its first POST (the flush ticker becomes part of Ack latency). Prefer one document per call for a clean message↔verdict mapping.Tests
10 new tests (
ingest_sync_test.go): server-accepts, encode-poison (0 requests sent), ctx-deadline-while-failing, after-close, 413 split settles all items, concurrent calls are batched, close-while-failing settlesReasonClosed, batch receipt, empty/pre-cancelled ctx. Full suite (32 tests) green,go vetclean, race detector clean across repeated runs.🤖 Generated with Claude Code