-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(mollifier): trigger burst smoothing — Phase 1 (monitoring) #3614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
c00b148
feat: trigger mollifier phase 1 scaffolding
d-cs ae05184
feat(mollifier): trigger burst smoothing — Phase 1 (trip evaluator + …
d-cs 31aefa1
chore(mollifier): address CodeRabbit review for phase-1 PR
d-cs f4bac2d
fix(mollifier): guard drainer shutdown registration against listener …
d-cs cfa6e9e
feat(mollifier): make resolveOrgFlag actually org-scoped via Organiza…
d-cs 2dee88c
fix(mollifier): mock db.server in gate test to avoid eager prisma con…
d-cs 4f5978a
chore(mollifier): address review follow-ups for phase-1 PR
d-cs 6c55bf8
fix(mollifier): extend MollifierEvaluateGate input to carry orgFeatur…
d-cs 74fe441
fix(mollifier): raise mollifierGate test timeout to 30s for postgresT…
d-cs 2afbe12
fix(mollifier): keep trigger hot path DB-free and fail open on flag e…
d-cs 8469561
fix(mollifier): bound drainer shutdown so a hung handler can't block …
d-cs d76bb9e
fix(mollifier): keep drainer loop alive across transient redis errors
d-cs f1efc41
fix(mollifier): add missing imports to readFallback.server.ts
d-cs 275a5ba
chore(mollifier): fix misleading rate-counter comment + symmetric eva…
d-cs e699034
chore(mollifier): merge phase-1 and phase-2 server-changes into one file
d-cs e734490
chore(mollifier): drop fuzz tests to keep phase-1 PR focused
d-cs 0bf53e7
chore(mollifier): drop drive-by enqueueSystem comment change
d-cs edd3250
chore(mollifier): rewrite server-changes note for external readers
d-cs 1e087e2
chore(mollifier): clarify server-changes note — monitoring only, no d…
d-cs 7d74b8a
refactor(mollifier): move DI seam types to the modules that define them
d-cs 5f06709
fix(mollifier): degrade to disabled when redis host is unset, no main…
d-cs f91cbf2
fix(mollifier): bound drainer per-tick env fan-out via maxEnvsPerTick
d-cs b7e2655
refactor(mollifier): align drainer stop semantics with FairQueue / Ba…
d-cs 24407fa
fix(mollifier): preserve env fairness when drainer slices
d-cs adc29fc
test(mollifier): pin no-starvation property for light env behind heav…
d-cs cb8a54d
fix(mollifier): typecheck — destructure popsPerTick to satisfy noUnch…
d-cs 3daee33
test(mollifier): cover six previously-untested drainer behaviours
d-cs 2cad05f
feat(mollifier): two-level org→env rotation in drainer for tenant-lev…
d-cs 2348bf2
chore(mollifier): rewrite changeset as feature intro (drop delta-lang…
d-cs 5610099
feat(mollifier): track org→envs in the buffer for clean org-level fai…
d-cs a1a0a85
revert(mollifier): use standard REDIS_* fallback and fail loud on mis…
d-cs 650f025
refactor(mollifier): drop the redundant mollifier:envs SET
d-cs 5163a65
refactor(mollifier): drop global FeatureFlag fallback in hot-path res…
d-cs c31eb22
fix(mollifier): pipeline per-tick org→env fan-out and reconcile shutd…
d-cs ed0c468
chore(mollifier): refresh redis-worker changeset for buffer-side org …
d-cs a467e9e
Merge branch 'main' into mollifier-phase-2
d-cs 7344211
test(redis-worker): drop vi.fn handler spies from drainer tests
d-cs 673c7d0
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs 60f2fb9
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs 9007053
switch info logging to debug
d-cs 6487461
refactor(webapp): split mollifier drainer factory into create + start
d-cs 02c0b71
refactor(webapp): move mollifier drainer bootstrap out of legacy work…
d-cs ad90fe3
feat(webapp): MOLLIFIER_DRAINER_ENABLED for per-service drainer control
d-cs e5d403e
refactor(webapp): prefix mollifier env vars with TRIGGER_
d-cs 50868ff
docs(review): clarify what the no-mocking rule is actually for
d-cs ee474b5
Merge branch 'main' into mollifier-phase-2
d-cs 92d0841
fix(redis-worker): clear MollifierDrainer.stop() timeout timer when l…
d-cs 0d12e7b
refactor(webapp): wire mollifier drainer shutdown through signalsEmitter
d-cs f2f4ba6
chore(review): revert the no-mocking-rule clarification
d-cs 5255c47
perf(webapp): short-circuit mollifier gate when globally disabled
d-cs 5c729a4
refactor(webapp): move the mollifier-globally-enabled check behind a …
d-cs f8c4077
Merge branch 'main' into mollifier-phase-2
d-cs c95e141
fix(webapp): fail loud on mollifier drainer misconfiguration
d-cs 68ae8b0
test(webapp): pin mollifier drainer worker error-classification policy
d-cs 83c6933
Merge branch 'main' into mollifier-phase-2
d-cs b96bae2
fix(redis-worker): catch processEntry errors in mollifier drainer to …
d-cs b512583
test(redis-worker): allow timer jitter in mollifier drainer stop-time…
d-cs 01433f5
chore(webapp): drop mollifier gate divert logs to debug
d-cs e26f6ed
docs(webapp): note shadow-mode INCRs the per-env mollifier rate counter
d-cs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| --- | ||
| "@trigger.dev/redis-worker": patch | ||
| --- | ||
|
|
||
| Add MollifierBuffer and MollifierDrainer primitives for trigger burst smoothing. | ||
|
|
||
| MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`, `evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions for status tracking. `evaluateTrip` is a sliding-window trip evaluator the webapp gate uses to detect per-env trigger bursts. | ||
|
|
||
| MollifierDrainer pops entries through a polling loop with a user-supplied handler. The loop survives transient Redis errors via capped exponential backoff (up to 5s), and per-env pop failures don't poison the rest of the batch — one env's blip is logged and counted as failed for that tick. Rotation is two-level: orgs at the top, envs within each org. The buffer maintains `mollifier:orgs` and `mollifier:org-envs:${orgId}` atomically with per-env queues, so the drainer walks orgs → envs directly without an in-memory cache. The `maxOrgsPerTick` option (default 500) caps how many orgs are scheduled per tick; for each picked org, one env is popped (rotating round-robin within the org). An org with N envs gets the same per-tick scheduling slot as an org with 1 env, so tenant-level drainage throughput is determined by org count rather than env count. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Lay the groundwork for an opt-in burst-protection layer on the trigger hot path. This release ships **monitoring only** — operators can observe per-env trigger storms via two opt-in modes, but no trigger calls are diverted or rate-limited yet (active burst smoothing follows in a later release). All new env vars are prefixed `TRIGGER_MOLLIFIER_*` and default off, so existing deployments see no behaviour change. With `TRIGGER_MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates a per-env rate counter and logs `mollifier.would_mollify` when the threshold is crossed. With `TRIGGER_MOLLIFIER_ENABLED=1` plus a per-org `mollifierEnabled` flag, over-threshold triggers are also recorded in a Redis audit buffer alongside the normal `engine.trigger` call, drained by a background no-op consumer. The drainer has its own switch (`TRIGGER_MOLLIFIER_DRAINER_ENABLED`) so multi-replica deployments can pin the polling loop to a single worker service while every replica still produces into the buffer; unset, it inherits `TRIGGER_MOLLIFIER_ENABLED` so single-container self-hosters need only one flag. Drainer misconfiguration (shutdown-timeout reconciliation against `GRACEFUL_SHUTDOWN_TIMEOUT`, or `TRIGGER_MOLLIFIER_ENABLED=1` with no buffer Redis) now throws `MollifierConfigurationError` at boot and crashes the process, so the misconfig surfaces to the orchestrator instead of disappearing into a log line; transient init failures (Redis blip) are still logged-and-swallowed. Emits the `mollifier.decisions` OTel counter for per-env rate visibility. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 107 additions & 0 deletions
107
apps/webapp/app/v3/mollifier/bufferedTriggerPayload.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3"; | ||
| import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server"; | ||
|
|
||
| // Canonical payload shape written to the mollifier buffer when the gate | ||
| // decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly | ||
| // (dual-write) so this is currently an audit/preview record. Phase 2 will | ||
| // make the buffer the primary write path: the drainer's handler will read | ||
| // this payload and replay it through engine.trigger to create the run in | ||
| // Postgres, and read-fallback endpoints will synthesise a Run view from it | ||
| // while it is still QUEUED. | ||
| // | ||
| // CONTRACT: this shape must contain everything needed for Phase 2's | ||
| // drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1 | ||
| // emits it to logs; Phase 2 will serialise it into Redis and rebuild it on | ||
| // the drain side. Keep it serialisable — no functions, no class instances. | ||
| export type BufferedTriggerPayload = { | ||
| runFriendlyId: string; | ||
|
|
||
| // Routing identifiers — let the drainer re-fetch full AuthenticatedEnvironment | ||
| // at replay time rather than embedding it in the payload. | ||
| envId: string; | ||
| envType: string; | ||
| envSlug: string; | ||
| orgId: string; | ||
| orgSlug: string; | ||
| projectId: string; | ||
| projectRef: string; | ||
|
|
||
| // Task identifier — looked up against the locked BackgroundWorkerTask | ||
| // at replay time to recover task-defaults. | ||
| taskId: string; | ||
|
|
||
| // Customer-supplied trigger body (payload, options, context). | ||
| body: TriggerTaskRequestBody; | ||
|
|
||
| // Resolved values from upstream concerns. The drainer should NOT re-resolve | ||
| // these — that would create a second idempotency-key check, etc. | ||
| idempotencyKey: string | null; | ||
| idempotencyKeyExpiresAt: string | null; | ||
| tags: string[]; | ||
|
|
||
| // Parent/root linkage for nested triggers. | ||
| parentRunFriendlyId: string | null; | ||
|
|
||
| // Trace context — propagates the original triggering span across the | ||
| // buffer→drain boundary so the run's lifecycle stays under one trace. | ||
| traceContext: Record<string, unknown>; | ||
|
|
||
| // Annotations + service options that influence routing/replay. | ||
| triggerSource: string; | ||
| triggerAction: string; | ||
| serviceOptions: TriggerTaskServiceOptions; | ||
|
|
||
| // Wall-clock instants relevant to the run. | ||
| createdAt: string; | ||
| }; | ||
|
|
||
| // Assemble the canonical payload from the inputs available at the point | ||
| // `evaluateGate` returns "mollify" in `RunEngineTriggerTaskService.call`. | ||
| // All fields must be derivable from data already in scope at that call site; | ||
| // nothing should require an extra DB lookup. | ||
| export function buildBufferedTriggerPayload(input: { | ||
| runFriendlyId: string; | ||
| taskId: string; | ||
| envId: string; | ||
| envType: string; | ||
| envSlug: string; | ||
| orgId: string; | ||
| orgSlug: string; | ||
| projectId: string; | ||
| projectRef: string; | ||
| body: TriggerTaskRequestBody; | ||
| idempotencyKey: string | null; | ||
| idempotencyKeyExpiresAt: Date | null; | ||
| tags: string[]; | ||
| parentRunFriendlyId: string | null; | ||
| traceContext: Record<string, unknown>; | ||
| triggerSource: string; | ||
| triggerAction: string; | ||
| serviceOptions: TriggerTaskServiceOptions; | ||
| createdAt: Date; | ||
| }): BufferedTriggerPayload { | ||
| return { | ||
| runFriendlyId: input.runFriendlyId, | ||
| envId: input.envId, | ||
| envType: input.envType, | ||
| envSlug: input.envSlug, | ||
| orgId: input.orgId, | ||
| orgSlug: input.orgSlug, | ||
| projectId: input.projectId, | ||
| projectRef: input.projectRef, | ||
| taskId: input.taskId, | ||
| body: input.body, | ||
| idempotencyKey: input.idempotencyKey, | ||
| idempotencyKeyExpiresAt: | ||
| input.idempotencyKey && input.idempotencyKeyExpiresAt | ||
| ? input.idempotencyKeyExpiresAt.toISOString() | ||
| : null, | ||
| tags: input.tags, | ||
| parentRunFriendlyId: input.parentRunFriendlyId, | ||
| traceContext: input.traceContext, | ||
| triggerSource: input.triggerSource, | ||
| triggerAction: input.triggerAction, | ||
| serviceOptions: input.serviceOptions, | ||
| createdAt: input.createdAt.toISOString(), | ||
| }; | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.