Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export const POST = withRouteHandler(
contextId,
resumeInput,
userId,
allowedPauseKinds: ['human'],
})

if (enqueueResult.status === 'queued') {
Expand Down
161 changes: 161 additions & 0 deletions apps/sim/app/api/resume/poll/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { db } from '@sim/db'
import { pausedExecutions } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { generateShortId } from '@sim/utils/id'
import { and, eq, isNotNull, lte } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'

const logger = createLogger('TimePauseResumePoll')

export const dynamic = 'force-dynamic'
export const maxDuration = 120

const LOCK_KEY = 'time-pause-resume-poll-lock'
const LOCK_TTL_SECONDS = 120
const POLL_BATCH_LIMIT = 200

interface StoredPausePoint {
contextId?: string
resumeStatus?: string
pauseKind?: string
resumeAt?: string
}

export const GET = withRouteHandler(async (request: NextRequest) => {
const requestId = generateShortId()

const authError = verifyCronAuth(request, 'Time-pause resume poll')
if (authError) return authError

const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS)
if (!lockAcquired) {
return NextResponse.json(
{ success: true, message: 'Polling already in progress – skipped', requestId },
{ status: 202 }
)
}

let claimedRows = 0
let dispatched = 0
const failures: { executionId: string; contextId: string; error: string }[] = []

try {
const now = new Date()

const dueRows = await db
.select({
id: pausedExecutions.id,
executionId: pausedExecutions.executionId,
workflowId: pausedExecutions.workflowId,
pausePoints: pausedExecutions.pausePoints,
metadata: pausedExecutions.metadata,
})
.from(pausedExecutions)
.where(
and(
eq(pausedExecutions.status, 'paused'),
isNotNull(pausedExecutions.nextResumeAt),
lte(pausedExecutions.nextResumeAt, now)
)
)
.limit(POLL_BATCH_LIMIT)
Comment on lines +56 to +66
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No ORDER BY on batch query — high-volume queues risk row starvation

Without an explicit ORDER BY, PostgreSQL returns rows in an unspecified order. When the queue depth exceeds POLL_BATCH_LIMIT = 200, the same 200 rows may be returned on every invocation (e.g. lowest physical heap order), while later-inserted rows are perpetually skipped. Adding .orderBy(pausedExecutions.nextResumeAt) ensures the most-overdue entries are always processed first and that all rows are eventually drained.

.orderBy(pausedExecutions.nextResumeAt)
.limit(POLL_BATCH_LIMIT)


claimedRows = dueRows.length

for (const row of dueRows) {
const points = (row.pausePoints ?? {}) as Record<string, StoredPausePoint>
const metadata = (row.metadata ?? {}) as Record<string, unknown>
const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : ''

const duePoints: StoredPausePoint[] = []
let nextRemaining: Date | null = null

for (const point of Object.values(points)) {
if (point.pauseKind !== 'time' || !point.resumeAt) continue
if (point.resumeStatus && point.resumeStatus !== 'paused') continue

const resumeAt = new Date(point.resumeAt)
if (Number.isNaN(resumeAt.getTime())) continue

if (resumeAt <= now) {
duePoints.push(point)
} else if (!nextRemaining || resumeAt < nextRemaining) {
nextRemaining = resumeAt
}
}

for (const point of duePoints) {
const contextId = point.contextId
if (!contextId) continue
try {
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
executionId: row.executionId,
contextId,
resumeInput: {},
userId,
})

if (enqueueResult.status === 'starting') {
PauseResumeManager.startResumeExecution({
resumeEntryId: enqueueResult.resumeEntryId,
resumeExecutionId: enqueueResult.resumeExecutionId,
pausedExecution: enqueueResult.pausedExecution,
contextId: enqueueResult.contextId,
resumeInput: enqueueResult.resumeInput,
userId: enqueueResult.userId,
}).catch((error) => {
logger.error('Background time-pause resume failed', {
executionId: row.executionId,
contextId,
error: toError(error).message,
})
})
}
dispatched++
} catch (error) {
const message = toError(error).message
logger.warn('Failed to dispatch time-pause resume', {
executionId: row.executionId,
contextId,
error: message,
})
failures.push({ executionId: row.executionId, contextId, error: message })
}
}

// We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and an
// operator must investigate stranded rows by hand. Setting nextResumeAt to the next
// future pause (or null) drops the row out of the poll, surfacing the failure.
await db
.update(pausedExecutions)
.set({ nextResumeAt: nextRemaining })
.where(eq(pausedExecutions.id, row.id))
Comment thread
TheodoreSpeaks marked this conversation as resolved.
}

logger.info('Time-pause resume poll completed', {
requestId,
claimedRows,
dispatched,
failureCount: failures.length,
Comment on lines +92 to +144
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Failed dispatches permanently strand executions

When enqueueOrStartResume throws for a due pause point, the error is caught and pushed to failures[], but nextRemaining is unaffected (it only tracks future points). The loop then runs UPDATE … SET next_resume_at = nextRemaining (effectively NULL when all points were due). After this update, the row no longer satisfies the cron query (isNotNull(nextResumeAt)), so it is silently abandoned and the workflow is permanently stuck in status = 'paused'.

Any transient failure — DB timeout, lock contention, network hiccup inside enqueueOrStartResume — turns into a permanent hang with no visible alert and no retry path.

A simple fix is to re-schedule failed points by putting their resumeAt back into nextRemaining:

for (const point of duePoints) {
  const contextId = point.contextId
  if (!contextId) continue
  try {
    // ... dispatch ...
    dispatched++
  } catch (error) {
    const message = toError(error).message
    logger.warn('Failed to dispatch time-pause resume', { ... })
    failures.push({ executionId: row.executionId, contextId, error: message })
    // Re-queue failed point
    if (point.resumeAt) {
      const retryAt = new Date(point.resumeAt)
      if (!Number.isNaN(retryAt.getTime())) {
        if (!nextRemaining || retryAt < nextRemaining) nextRemaining = retryAt
      }
    }
  }
}

Alternatively, schedule a short retry (e.g. new Date(Date.now() + 60_000)) to avoid hammering a bad point at full frequency.

})

return NextResponse.json({
success: true,
requestId,
claimedRows,
dispatched,
failures,
})
} catch (error) {
const message = toError(error).message
logger.error('Time-pause resume poll failed', { requestId, error: message })
return NextResponse.json({ success: false, requestId, error: message }, { status: 500 })
} finally {
await releaseLock(LOCK_KEY, requestId).catch(() => {})
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ interface PausePointWithQueue {
latestResumeEntry?: ResumeQueueEntrySummary | null
parallelScope?: any
loopScope?: any
pauseKind?: 'human' | 'time'
resumeAt?: string
}

interface PausedExecutionSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ export const ShortInput = memo(function ShortInput({
<>
<Input
ref={ref as React.RefObject<HTMLInputElement>}
className='allow-scroll w-full overflow-auto text-transparent selection:text-transparent caret-foreground [-ms-overflow-style:none] [scrollbar-width:none] placeholder:text-muted-foreground/50 [&::-webkit-scrollbar]:hidden'
className='allow-scroll w-full overflow-auto text-transparent caret-foreground [-ms-overflow-style:none] [scrollbar-width:none] selection:text-transparent placeholder:text-muted-foreground/50 [&::-webkit-scrollbar]:hidden'
readOnly={readOnly}
placeholder={placeholder ?? ''}
type='text'
Expand Down
21 changes: 14 additions & 7 deletions apps/sim/blocks/blocks/wait.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ const WaitIcon = (props: SVGProps<SVGSVGElement>) => createElement(PauseCircle,
export const WaitBlock: BlockConfig = {
type: 'wait',
name: 'Wait',
description: 'Pause workflow execution for a specified time delay',
description: 'Pause workflow execution for up to 30 days',
longDescription:
'Pauses workflow execution for a specified time interval. The wait executes a simple sleep for the configured duration.',
'Pauses workflow execution for a specified time interval. Waits up to five minutes are held in-process; longer waits suspend the workflow and resume automatically once the configured duration elapses.',
bestPractices: `
- Use for simple time delays (max 10 minutes)
- Configure the wait amount and unit (seconds or minutes)
- Time-based waits are interruptible via workflow cancellation
- Configure the wait amount and unit (seconds, minutes, hours, or days)
- Maximum wait duration is 30 days
- Waits up to 5 minutes execute in-process and are interruptible via workflow cancellation
- Longer waits suspend the workflow; the execution resumes automatically when the timer fires
- Enter a positive number for the wait amount
`,
category: 'blocks',
Expand All @@ -26,7 +27,7 @@ export const WaitBlock: BlockConfig = {
id: 'timeValue',
title: 'Wait Amount',
type: 'short-input',
description: 'Max: 600 seconds or 10 minutes',
description: 'Max: 30 days',
placeholder: '10',
value: () => '10',
required: true,
Expand All @@ -38,6 +39,8 @@ export const WaitBlock: BlockConfig = {
options: [
{ label: 'Seconds', id: 'seconds' },
{ label: 'Minutes', id: 'minutes' },
{ label: 'Hours', id: 'hours' },
{ label: 'Days', id: 'days' },
],
value: () => 'seconds',
required: true,
Expand All @@ -53,7 +56,7 @@ export const WaitBlock: BlockConfig = {
},
timeUnit: {
type: 'string',
description: 'Wait duration unit (seconds or minutes)',
description: 'Wait duration unit (seconds, minutes, hours, or days)',
},
},
outputs: {
Expand All @@ -65,5 +68,9 @@ export const WaitBlock: BlockConfig = {
type: 'string',
description: 'Status of the wait block (waiting, completed, cancelled)',
},
resumeAt: {
type: 'string',
description: 'ISO timestamp at which a suspended wait will resume (long waits only)',
},
},
}
2 changes: 2 additions & 0 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ export class ExecutionEngine {
parallelScope: pause.parallelScope,
loopScope: pause.loopScope,
resumeLinks: pause.resumeLinks,
pauseKind: pause.pauseKind,
resumeAt: pause.resumeAt,
}))

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
parallelScope,
loopScope,
resumeLinks,
pauseKind: 'human',
}

const responseOutput: Record<string, any> = {
Expand Down
Loading
Loading