feat(block): Allow wait block to wait up to 30 days#4331
feat(block): Allow wait block to wait up to 30 days#4331TheodoreSpeaks wants to merge 4 commits intostagingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
|
@BugBot review |
PR SummaryMedium Risk Overview Introduces Tightens manual resume to human-only pauses ( Reviewed by Cursor Bugbot for commit 7678245. Bugbot is set up for automated code reviews on this repo. Configure here. |
Greptile SummaryThis PR extends the Wait block to support durations up to 30 days by reusing the human-in-the-loop pause/resume infrastructure. Waits ≤ 5 minutes continue to execute in-process; longer waits suspend the workflow by writing
Confidence Score: 3/5Not safe to merge until the failed-dispatch silent-strand bug is fixed; any transient error during resume dispatch permanently locks a workflow. A confirmed P1 defect in the new poll route means any transient DB or lock error during dispatch will permanently orphan a paused execution with no retry or observability. The rest of the implementation (schema migration, in-process vs. suspended branching, UI filtering, cron config) is well-structured and correct. apps/sim/app/api/resume/poll/route.ts — the failed-dispatch no-retry bug and missing ORDER BY Important Files Changed
Sequence DiagramsequenceDiagram
participant W as WaitBlockHandler
participant E as ExecutionEngine
participant M as PauseResumeManager
participant DB as pausedExecutions DB
participant C as CronJob (/api/resume/poll)
W->>W: execute(inputs)
alt waitMs ≤ 5 min (in-process)
W->>W: sleep(waitMs)
W-->>E: {status: 'completed'}
else waitMs > 5 min (suspended)
W-->>E: {status: 'waiting', _pauseMetadata: {pauseKind: 'time', resumeAt}}
E->>M: persistPauseResult(pausePoints)
M->>M: compute nextResumeAt (earliest time pause point)
M->>DB: INSERT/UPDATE pausedExecutions {nextResumeAt}
end
loop Every 1 minute
C->>DB: SELECT WHERE status='paused' AND nextResumeAt <= now LIMIT 200
DB-->>C: dueRows[]
loop for each dueRow
loop for each duePoint (pauseKind='time', resumeAt <= now)
C->>M: enqueueOrStartResume({executionId, contextId})
M-->>C: {status: 'starting', ...}
C->>M: startResumeExecution() [fire and forget]
end
C->>DB: UPDATE SET nextResumeAt = nextRemaining (null if all done)
end
end
Reviews (1): Last reviewed commit: "restore ff" | Re-trigger Greptile |
| for (const point of duePoints) { | ||
| const contextId = point.contextId | ||
| if (!contextId) continue | ||
| try { | ||
| const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ | ||
| executionId: row.executionId, | ||
| contextId, | ||
| resumeInput: {}, | ||
| userId, | ||
| }) | ||
|
|
||
| if (enqueueResult.status === 'starting') { | ||
| PauseResumeManager.startResumeExecution({ | ||
| resumeEntryId: enqueueResult.resumeEntryId, | ||
| resumeExecutionId: enqueueResult.resumeExecutionId, | ||
| pausedExecution: enqueueResult.pausedExecution, | ||
| contextId: enqueueResult.contextId, | ||
| resumeInput: enqueueResult.resumeInput, | ||
| userId: enqueueResult.userId, | ||
| }).catch((error) => { | ||
| logger.error('Background time-pause resume failed', { | ||
| executionId: row.executionId, | ||
| contextId, | ||
| error: toError(error).message, | ||
| }) | ||
| }) | ||
| } | ||
| dispatched++ | ||
| } catch (error) { | ||
| const message = toError(error).message | ||
| logger.warn('Failed to dispatch time-pause resume', { | ||
| executionId: row.executionId, | ||
| contextId, | ||
| error: message, | ||
| }) | ||
| failures.push({ executionId: row.executionId, contextId, error: message }) | ||
| } | ||
| } | ||
|
|
||
| await db | ||
| .update(pausedExecutions) | ||
| .set({ nextResumeAt: nextRemaining }) | ||
| .where(eq(pausedExecutions.id, row.id)) | ||
| } | ||
|
|
||
| logger.info('Time-pause resume poll completed', { | ||
| requestId, | ||
| claimedRows, | ||
| dispatched, | ||
| failureCount: failures.length, |
There was a problem hiding this comment.
Failed dispatches permanently strand executions
When enqueueOrStartResume throws for a due pause point, the error is caught and pushed to failures[], but nextRemaining is unaffected (it only tracks future points). The loop then runs UPDATE … SET next_resume_at = nextRemaining (effectively NULL when all points were due). After this update, the row no longer satisfies the cron query (isNotNull(nextResumeAt)), so it is silently abandoned and the workflow is permanently stuck in status = 'paused'.
Any transient failure — DB timeout, lock contention, network hiccup inside enqueueOrStartResume — turns into a permanent hang with no visible alert and no retry path.
A simple fix is to re-schedule failed points by putting their resumeAt back into nextRemaining:
for (const point of duePoints) {
const contextId = point.contextId
if (!contextId) continue
try {
// ... dispatch ...
dispatched++
} catch (error) {
const message = toError(error).message
logger.warn('Failed to dispatch time-pause resume', { ... })
failures.push({ executionId: row.executionId, contextId, error: message })
// Re-queue failed point
if (point.resumeAt) {
const retryAt = new Date(point.resumeAt)
if (!Number.isNaN(retryAt.getTime())) {
if (!nextRemaining || retryAt < nextRemaining) nextRemaining = retryAt
}
}
}
}Alternatively, schedule a short retry (e.g. new Date(Date.now() + 60_000)) to avoid hammering a bad point at full frequency.
| metadata: pausedExecutions.metadata, | ||
| }) | ||
| .from(pausedExecutions) | ||
| .where( | ||
| and( | ||
| eq(pausedExecutions.status, 'paused'), | ||
| isNotNull(pausedExecutions.nextResumeAt), | ||
| lte(pausedExecutions.nextResumeAt, now) | ||
| ) | ||
| ) | ||
| .limit(POLL_BATCH_LIMIT) |
There was a problem hiding this comment.
No
ORDER BY on batch query — high-volume queues risk row starvation
Without an explicit ORDER BY, PostgreSQL returns rows in an unspecified order. When the queue depth exceeds POLL_BATCH_LIMIT = 200, the same 200 rows may be returned on every invocation (e.g. lowest physical heap order), while later-inserted rows are perpetually skipped. Adding .orderBy(pausedExecutions.nextResumeAt) ensures the most-overdue entries are always processed first and that all rows are eventually drained.
.orderBy(pausedExecutions.nextResumeAt)
.limit(POLL_BATCH_LIMIT)| async executeWithNode( | ||
| ctx: ExecutionContext, | ||
| block: SerializedBlock, | ||
| inputs: Record<string, any>, | ||
| nodeMetadata: { | ||
| nodeId: string | ||
| loopId?: string | ||
| parallelId?: string | ||
| branchIndex?: number | ||
| branchTotal?: number | ||
| } | ||
| ): Promise<BlockOutput> { |
There was a problem hiding this comment.
executeWithNode signature is narrower than the BlockHandler interface
BlockHandler.executeWithNode in types.ts declares nodeMetadata with three additional optional fields (originalBlockId, isLoopNode, executionOrder). The WaitBlockHandler implementation omits all three, so the method technically does not satisfy the declared interface contract. While TypeScript currently allows this (the extra fields are optional and ignored at runtime), it means callers that pass full nodeMetadata objects will silently drop fields the handler might need in a future iteration. Widening the implementation's parameter type to match the interface definition prevents this drift.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 0c32dd4. Configure here.

Summary
Reuse human in the loop logic to allow wait blocks to wait up to 30 days. Think this could be useful for things like email automation where you want to send followups after x days.
Type of Change
Testing
Checklist
Screenshots/Videos