From a66cfe3fd6df462480c9e908d339f6a18dd49b4c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 11 Apr 2026 10:32:07 -0700 Subject: [PATCH 01/32] Add table triggers for columns and row added --- .../workflow-block/workflow-block.tsx | 4 +- apps/sim/blocks/blocks/table.ts | 6 + apps/sim/lib/table/service.ts | 54 +++++- apps/sim/lib/table/trigger.ts | 170 +++++++++++++++++ apps/sim/lib/table/types.ts | 1 + apps/sim/lib/webhooks/deploy.ts | 50 +++-- apps/sim/lib/webhooks/processor.ts | 3 +- apps/sim/lib/webhooks/providers/registry.ts | 2 + apps/sim/lib/webhooks/providers/table.ts | 13 ++ apps/sim/triggers/registry.ts | 2 + apps/sim/triggers/table/index.ts | 1 + apps/sim/triggers/table/poller.ts | 175 ++++++++++++++++++ 12 files changed, 462 insertions(+), 19 deletions(-) create mode 100644 apps/sim/lib/table/trigger.ts create mode 100644 apps/sim/lib/webhooks/providers/table.ts create mode 100644 apps/sim/triggers/table/index.ts create mode 100644 apps/sim/triggers/table/poller.ts diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx index 8db6eabe3a1..4d9162f602a 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block.tsx @@ -631,12 +631,12 @@ const SubBlockRow = memo(function SubBlockRow({ const { data: tables = [] } = useTablesList(workspaceId || '') const tableDisplayName = useMemo(() => { - if (subBlock?.id !== 'tableId' || typeof rawValue !== 'string') { + if (subBlock?.type !== 'table-selector' || typeof rawValue !== 'string') { return null } const table = tables.find((t) => t.id === rawValue) return table?.name ?? null - }, [subBlock?.id, rawValue, tables]) + }, [subBlock?.type, rawValue, tables]) const webhookUrlDisplayValue = useMemo(() => { if (subBlock?.id !== 'webhookUrlDisplay' || !blockId) { diff --git a/apps/sim/blocks/blocks/table.ts b/apps/sim/blocks/blocks/table.ts index a903cf3b3e4..738891c4182 100644 --- a/apps/sim/blocks/blocks/table.ts +++ b/apps/sim/blocks/blocks/table.ts @@ -3,6 +3,7 @@ import { TABLE_LIMITS } from '@/lib/table/constants' import { filterRulesToFilter, sortRulesToSort } from '@/lib/table/query-builder/converters' import type { BlockConfig } from '@/blocks/types' import type { TableQueryResponse } from '@/tools/table/types' +import { getTrigger } from '@/triggers' /** * Parses a JSON string with helpful error messages. @@ -551,6 +552,7 @@ Return ONLY the sort JSON:`, condition: { field: 'operation', value: 'query_rows' }, value: () => '0', }, + ...getTrigger('table_new_row').subBlocks, ], tools: { @@ -688,4 +690,8 @@ Return ONLY the sort JSON:`, }, message: { type: 'string', description: 'Operation status message' }, }, + triggers: { + enabled: true, + available: ['table_new_row'], + }, } diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 5a874be671b..b26116ece84 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -15,6 +15,7 @@ import { getPostgresErrorCode } from '@/lib/core/utils/pg-error' import { generateRestoreName } from '@/lib/core/utils/restore-name' import { generateId } from '@/lib/core/utils/uuid' import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants' +import { fireTableTrigger } from './trigger' import { buildFilterClause, buildSortClause } from './sql' import type { BatchInsertData, @@ -652,13 +653,17 @@ export async function insertRow( logger.info(`[${requestId}] Inserted row ${rowId} into table ${data.tableId}`) - return { + const insertedRow: TableRow = { id: row.id, data: row.data as RowData, position: row.position, createdAt: row.createdAt, updatedAt: row.updatedAt, } + + void fireTableTrigger(data.tableId, table.name, 'insert', [insertedRow], null, table.schema, requestId) + + return insertedRow } /** @@ -767,13 +772,17 @@ export async function batchInsertRows( logger.info(`[${requestId}] Batch inserted ${data.rows.length} rows into table ${data.tableId}`) - return insertedRows.map((r) => ({ + const result: TableRow[] = insertedRows.map((r) => ({ id: r.id, data: r.data as RowData, position: r.position, createdAt: r.createdAt, updatedAt: r.updatedAt, })) + + void fireTableTrigger(data.tableId, table.name, 'insert', result, null, table.schema, requestId) + + return result } /** @@ -883,6 +892,8 @@ export async function upsertRow( const now = new Date() if (existingRow) { + const previousData = existingRow.data as RowData + const [updatedRow] = await trx .update(userTableRows) .set({ @@ -900,6 +911,7 @@ export async function upsertRow( createdAt: updatedRow.createdAt, updatedAt: updatedRow.updatedAt, }, + previousData, operation: 'update' as const, } } @@ -951,6 +963,13 @@ export async function upsertRow( `[${requestId}] Upserted (${result.operation}) row ${result.row.id} in table ${data.tableId}` ) + if (result.operation === 'insert') { + void fireTableTrigger(data.tableId, table.name, 'insert', [result.row], null, table.schema, requestId) + } else if (result.operation === 'update' && result.previousData) { + const oldRows = new Map([[result.row.id, result.previousData]]) + void fireTableTrigger(data.tableId, table.name, 'update', [result.row], oldRows, table.schema, requestId) + } + return result } @@ -1126,13 +1145,18 @@ export async function updateRow( logger.info(`[${requestId}] Updated row ${data.rowId} in table ${data.tableId}`) - return { + const updatedRow: TableRow = { id: data.rowId, data: data.data, position: existingRow.position, createdAt: existingRow.createdAt, updatedAt: now, } + + const oldRows = new Map([[data.rowId, existingRow.data as RowData]]) + void fireTableTrigger(data.tableId, table.name, 'update', [updatedRow], oldRows, table.schema, requestId) + + return updatedRow } /** @@ -1277,6 +1301,17 @@ export async function updateRowsByFilter( logger.info(`[${requestId}] Updated ${matchingRows.length} rows in table ${data.tableId}`) + // Fire update triggers with old and new row data + const oldRows = new Map(matchingRows.map((r) => [r.id, r.data as RowData])) + const updatedRows: TableRow[] = matchingRows.map((r) => ({ + id: r.id, + data: { ...(r.data as RowData), ...data.data }, + position: 0, + createdAt: now, + updatedAt: now, + })) + void fireTableTrigger(data.tableId, table.name, 'update', updatedRows, oldRows, table.schema, requestId) + return { affectedCount: matchingRows.length, affectedRowIds: matchingRows.map((r) => r.id), @@ -1365,6 +1400,19 @@ export async function batchUpdateRows( logger.info(`[${requestId}] Batch updated ${mergedUpdates.length} rows in table ${data.tableId}`) + // Fire update triggers with old and new row data + const oldRowsForTrigger = new Map( + data.updates.map((u) => [u.rowId, existingMap.get(u.rowId)!]) + ) + const updatedRowsForTrigger: TableRow[] = mergedUpdates.map(({ rowId, mergedData }) => ({ + id: rowId, + data: mergedData, + position: 0, + createdAt: now, + updatedAt: now, + })) + void fireTableTrigger(data.tableId, table.name, 'update', updatedRowsForTrigger, oldRowsForTrigger, table.schema, requestId) + return { affectedCount: mergedUpdates.length, affectedRowIds: mergedUpdates.map((u) => u.rowId), diff --git a/apps/sim/lib/table/trigger.ts b/apps/sim/lib/table/trigger.ts new file mode 100644 index 00000000000..5bddd415976 --- /dev/null +++ b/apps/sim/lib/table/trigger.ts @@ -0,0 +1,170 @@ +/** + * Direct trigger firing for table row events. + * + * When rows are inserted or updated in a table, this module looks up any + * active webhook triggers watching that table and fires workflow executions + * immediately - no polling or cron involved. + */ + +import { createLogger } from '@sim/logger' +import { generateShortId } from '@/lib/core/utils/uuid' +import { fetchActiveWebhooks } from '@/lib/webhooks/polling/utils' +import { processPolledWebhookEvent } from '@/lib/webhooks/processor' +import type { RowData, TableRow, TableSchema } from '@/lib/table/types' + +const logger = createLogger('TableTrigger') + +type EventType = 'insert' | 'update' + +interface TableTriggerPayload { + row: Record | null + rawRow: Record + previousRow: Record | null + changedColumns: string[] + rowId: string + headers: string[] + rowNumber: number + tableId: string + tableName: string + timestamp: string +} + +interface WebhookConfig { + tableId?: string + tableSelector?: string + manualTableId?: string + eventType?: string + watchColumns?: string | string[] + includeHeaders?: boolean +} + +/** + * Fires workflow triggers for table row changes. + * + * This is fire-and-forget - errors are logged but never thrown. + * Call with `void fireTableTrigger(...)` to avoid blocking the caller. + * + * @param eventType - 'insert' for new rows, 'update' for changed rows + * @param oldRows - Map of row ID to previous data. Pass null for inserts. + */ +export async function fireTableTrigger( + tableId: string, + tableName: string, + eventType: EventType, + rows: TableRow[], + oldRows: Map | null, + schema: TableSchema, + requestId: string +): Promise { + try { + const webhooks = await fetchActiveWebhooks('table') + if (webhooks.length === 0) return + + const headers = schema.columns.map((c) => c.name) + + // Filter to webhooks watching this table with a matching event type + const matching = webhooks.filter((entry) => { + const config = entry.webhook.providerConfig as WebhookConfig | null + const configTableId = config?.tableId ?? config?.tableSelector ?? config?.manualTableId + if (configTableId !== tableId) return false + + const configEventType = config?.eventType ?? 'insert' + return configEventType === eventType + }) + + if (matching.length === 0) return + + logger.info( + `[${requestId}] Firing ${matching.length} trigger(s) for ${rows.length} ${eventType} event(s) in table ${tableId}` + ) + + for (const { webhook: webhookData, workflow: workflowData } of matching) { + const config = webhookData.providerConfig as WebhookConfig | null + const watchColumns = parseWatchColumns(config?.watchColumns) + const includeHeaders = config?.includeHeaders !== false + + for (const row of rows) { + const previousRow = oldRows?.get(row.id) ?? null + const changedColumns = previousRow + ? detectChangedColumns(previousRow, row.data) + : [] + + // For updates with watch columns, skip rows where no watched column changed + if (eventType === 'update' && watchColumns.length > 0) { + const hasWatchedChange = changedColumns.some((col) => watchColumns.includes(col)) + if (!hasWatchedChange) continue + } + + // Build mapped row if includeHeaders is enabled + let mappedRow: Record | null = null + if (includeHeaders && headers.length > 0) { + mappedRow = {} + for (const header of headers) { + mappedRow[header] = row.data[header] ?? null + } + } + + const payload: TableTriggerPayload = { + row: mappedRow, + rawRow: row.data, + previousRow, + changedColumns, + rowId: row.id, + headers, + rowNumber: row.position, + tableId, + tableName, + timestamp: new Date().toISOString(), + } + + const eventRequestId = generateShortId() + + try { + const result = await processPolledWebhookEvent( + webhookData, + workflowData, + payload, + eventRequestId + ) + + if (!result.success) { + logger.error( + `[${eventRequestId}] Failed to fire table trigger for row ${row.id}:`, + result.statusCode, + result.error + ) + } + } catch (error) { + logger.error( + `[${eventRequestId}] Error firing table trigger for row ${row.id}:`, + error + ) + } + } + } + } catch (error) { + logger.error(`[${requestId}] Error in fireTableTrigger:`, error) + } +} + +function parseWatchColumns(watchColumns: string | string[] | undefined): string[] { + if (!watchColumns) return [] + if (Array.isArray(watchColumns)) return watchColumns.filter(Boolean) + return watchColumns + .split(',') + .map((c) => c.trim()) + .filter(Boolean) +} + +function detectChangedColumns(oldData: RowData, newData: RowData): string[] { + const changed: string[] = [] + const allKeys = new Set([...Object.keys(oldData), ...Object.keys(newData)]) + + for (const key of allKeys) { + if (JSON.stringify(oldData[key]) !== JSON.stringify(newData[key])) { + changed.push(key) + } + } + + return changed +} diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index ea97b292152..3e6fbf9c8c8 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -198,6 +198,7 @@ export interface UpsertRowData { export interface UpsertResult { row: TableRow operation: 'insert' | 'update' + previousData?: RowData } export interface UpdateRowData { diff --git a/apps/sim/lib/webhooks/deploy.ts b/apps/sim/lib/webhooks/deploy.ts index 4a189b681b8..19c7648275a 100644 --- a/apps/sim/lib/webhooks/deploy.ts +++ b/apps/sim/lib/webhooks/deploy.ts @@ -182,20 +182,44 @@ function buildProviderConfig( Object.entries(block.subBlocks || {}).map(([key, value]) => [key, { value: value.value }]) ) - triggerDef.subBlocks - .filter( - (subBlock) => - (subBlock.mode === 'trigger' || subBlock.mode === 'trigger-advanced') && - !SYSTEM_SUBBLOCK_IDS.includes(subBlock.id) - ) - .forEach((subBlock) => { - const valueToUse = getConfigValue(block, subBlock) - if (valueToUse !== null && valueToUse !== undefined && valueToUse !== '') { - providerConfig[subBlock.id] = valueToUse - } else if (isFieldRequired(subBlock, subBlockValues)) { - missingFields.push(subBlock.title || subBlock.id) + // Track which canonical groups already have a value so we don't flag the + // other member of a basic/advanced pair as missing. + const canonicalHasValue = new Set() + const canonicalReportedMissing = new Set() + + const triggerSubBlocks = triggerDef.subBlocks.filter( + (subBlock) => + (subBlock.mode === 'trigger' || subBlock.mode === 'trigger-advanced') && + !SYSTEM_SUBBLOCK_IDS.includes(subBlock.id) + ) + + // First pass: collect values, resolving canonical pairs to a single key + for (const subBlock of triggerSubBlocks) { + const valueToUse = getConfigValue(block, subBlock) + if (valueToUse !== null && valueToUse !== undefined && valueToUse !== '') { + providerConfig[subBlock.id] = valueToUse + if (subBlock.canonicalParamId) { + // Also store under the canonical key so consumers can read one consistent name + if (!canonicalHasValue.has(subBlock.canonicalParamId)) { + providerConfig[subBlock.canonicalParamId] = valueToUse + } + canonicalHasValue.add(subBlock.canonicalParamId) } - }) + } + } + + // Second pass: check required fields, skipping canonical pairs that already have a value + for (const subBlock of triggerSubBlocks) { + const hasValue = providerConfig[subBlock.id] !== undefined + if (!hasValue && isFieldRequired(subBlock, subBlockValues)) { + if (subBlock.canonicalParamId) { + if (canonicalHasValue.has(subBlock.canonicalParamId)) continue + if (canonicalReportedMissing.has(subBlock.canonicalParamId)) continue + canonicalReportedMissing.add(subBlock.canonicalParamId) + } + missingFields.push(subBlock.title || subBlock.id) + } + } const credentialConfig = triggerDef.subBlocks.find( (subBlock) => subBlock.id === 'triggerCredentials' diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index ba20a6c4cbc..e0acaaf9314 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -679,6 +679,7 @@ export interface PolledWebhookEventResult { success: boolean error?: string statusCode?: number + executionId?: string } interface PolledWebhookRecord { @@ -882,7 +883,7 @@ export async function processPolledWebhookEvent( } } - return { success: true } + return { success: true, executionId } } catch (error: unknown) { if (error instanceof DispatchQueueFullError) { logger.warn(`[${requestId}] Dispatch queue full for polled webhook: ${error.message}`) diff --git a/apps/sim/lib/webhooks/providers/registry.ts b/apps/sim/lib/webhooks/providers/registry.ts index 332add6598f..e9f26acf32d 100644 --- a/apps/sim/lib/webhooks/providers/registry.ts +++ b/apps/sim/lib/webhooks/providers/registry.ts @@ -31,6 +31,7 @@ import { salesforceHandler } from '@/lib/webhooks/providers/salesforce' import { servicenowHandler } from '@/lib/webhooks/providers/servicenow' import { slackHandler } from '@/lib/webhooks/providers/slack' import { stripeHandler } from '@/lib/webhooks/providers/stripe' +import { tableProviderHandler } from '@/lib/webhooks/providers/table' import { telegramHandler } from '@/lib/webhooks/providers/telegram' import { twilioHandler } from '@/lib/webhooks/providers/twilio' import { twilioVoiceHandler } from '@/lib/webhooks/providers/twilio-voice' @@ -76,6 +77,7 @@ const PROVIDER_HANDLERS: Record = { servicenow: servicenowHandler, slack: slackHandler, stripe: stripeHandler, + table: tableProviderHandler, telegram: telegramHandler, twilio: twilioHandler, twilio_voice: twilioVoiceHandler, diff --git a/apps/sim/lib/webhooks/providers/table.ts b/apps/sim/lib/webhooks/providers/table.ts new file mode 100644 index 00000000000..ed1e4fb202f --- /dev/null +++ b/apps/sim/lib/webhooks/providers/table.ts @@ -0,0 +1,13 @@ +import type { FormatInputContext, FormatInputResult, WebhookProviderHandler } from './types' + +/** + * Provider handler for table triggers. + * + * Tables use direct triggering (fired from the insert codepath), + * so this handler only needs formatInput to pass the payload through. + */ +export const tableProviderHandler: WebhookProviderHandler = { + async formatInput({ body }: FormatInputContext): Promise { + return { input: body } + }, +} diff --git a/apps/sim/triggers/registry.ts b/apps/sim/triggers/registry.ts index c1895086494..8eec130d1d6 100644 --- a/apps/sim/triggers/registry.ts +++ b/apps/sim/triggers/registry.ts @@ -247,6 +247,7 @@ import { } from '@/triggers/servicenow' import { slackWebhookTrigger } from '@/triggers/slack' import { stripeWebhookTrigger } from '@/triggers/stripe' +import { tableNewRowTrigger } from '@/triggers/table' import { telegramWebhookTrigger } from '@/triggers/telegram' import { twilioVoiceWebhookTrigger } from '@/triggers/twilio_voice' import { typeformWebhookTrigger } from '@/triggers/typeform' @@ -456,6 +457,7 @@ export const TRIGGER_REGISTRY: TriggerRegistry = { servicenow_change_request_updated: servicenowChangeRequestUpdatedTrigger, servicenow_webhook: servicenowWebhookTrigger, stripe_webhook: stripeWebhookTrigger, + table_new_row: tableNewRowTrigger, telegram_webhook: telegramWebhookTrigger, typeform_webhook: typeformWebhookTrigger, whatsapp_webhook: whatsappWebhookTrigger, diff --git a/apps/sim/triggers/table/index.ts b/apps/sim/triggers/table/index.ts new file mode 100644 index 00000000000..a555c99fd37 --- /dev/null +++ b/apps/sim/triggers/table/index.ts @@ -0,0 +1 @@ +export { tableNewRowTrigger } from './poller' diff --git a/apps/sim/triggers/table/poller.ts b/apps/sim/triggers/table/poller.ts new file mode 100644 index 00000000000..f3d5d386b1c --- /dev/null +++ b/apps/sim/triggers/table/poller.ts @@ -0,0 +1,175 @@ +import { TableIcon } from '@/components/icons' +import { getQueryClient } from '@/app/_shell/providers/get-query-client' +import { tableKeys } from '@/hooks/queries/tables' +import type { TableDefinition } from '@/lib/table' +import { useWorkflowRegistry } from '@/stores/workflows/registry/store' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' +import type { TriggerConfig } from '@/triggers/types' + +async function fetchTableColumns(blockId: string): Promise> { + const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId + const workspaceId = useWorkflowRegistry.getState().hydration.workspaceId + if (!activeWorkflowId || !workspaceId) return [] + + const blockValues = + useSubBlockStore.getState().workflowValues[activeWorkflowId]?.[blockId] + const tableId = (blockValues?.tableSelector as string) || (blockValues?.manualTableId as string) + if (!tableId) return [] + + const tables = await getQueryClient().fetchQuery({ + queryKey: tableKeys.list(workspaceId), + queryFn: async ({ signal }) => { + const res = await fetch( + `/api/table?workspaceId=${encodeURIComponent(workspaceId)}&scope=active`, + { signal } + ) + if (!res.ok) return [] + const response = await res.json() + return (response.data?.tables || []) as TableDefinition[] + }, + staleTime: 60 * 1000, + }) + + const table = tables.find((t: TableDefinition) => t.id === tableId) + if (!table?.schema?.columns) return [] + + return table.schema.columns.map((col) => ({ id: col.name, label: col.name })) +} + +export const tableNewRowTrigger: TriggerConfig = { + id: 'table_new_row', + name: 'Table Trigger', + provider: 'table', + description: 'Triggers when rows are inserted or updated in a table', + version: '1.0.0', + icon: TableIcon, + + subBlocks: [ + { + id: 'tableSelector', + title: 'Table', + type: 'table-selector', + description: 'The table to monitor.', + required: true, + mode: 'trigger', + canonicalParamId: 'tableId', + placeholder: 'Select a table', + }, + { + id: 'manualTableId', + title: 'Table ID', + type: 'short-input', + placeholder: 'Enter table ID', + description: 'The table to monitor.', + required: true, + mode: 'trigger-advanced', + canonicalParamId: 'tableId', + }, + { + id: 'eventType', + title: 'Event', + type: 'dropdown', + options: [ + { id: 'insert', label: 'Row Inserted' }, + { id: 'update', label: 'Row Updated' }, + ], + defaultValue: 'insert', + description: 'The type of event to trigger on.', + required: true, + mode: 'trigger', + }, + { + id: 'watchColumns', + title: 'Watch Columns', + type: 'dropdown', + multiSelect: true, + options: [], + placeholder: 'All columns', + description: 'Only fire when these columns change. Leave empty to fire on any update.', + required: false, + mode: 'trigger', + condition: { field: 'eventType', value: 'update' }, + dependsOn: { any: ['tableSelector', 'manualTableId'] }, + fetchOptions: fetchTableColumns, + }, + { + id: 'includeHeaders', + title: 'Map Row Values to Headers', + type: 'switch', + defaultValue: true, + description: + 'When enabled, each row is returned as a key-value object mapped to column names.', + required: false, + mode: 'trigger', + }, + { + id: 'triggerSave', + title: '', + type: 'trigger-save', + hideFromPreview: true, + mode: 'trigger', + triggerId: 'table_new_row', + }, + { + id: 'triggerInstructions', + title: 'Setup Instructions', + hideFromPreview: true, + type: 'text', + defaultValue: [ + 'Select the table to monitor', + 'Choose whether to trigger on row inserts or updates', + 'For updates, optionally select specific columns to watch', + 'The workflow will trigger automatically when the event occurs', + ] + .map( + (instruction, index) => + `
${index + 1}. ${instruction}
` + ) + .join(''), + mode: 'trigger', + }, + ], + + outputs: { + row: { + type: 'json', + description: 'Row data mapped to column names (when header mapping is enabled)', + }, + rawRow: { + type: 'json', + description: 'Raw row data object', + }, + previousRow: { + type: 'json', + description: 'Previous row data before the update (null for inserts)', + }, + changedColumns: { + type: 'json', + description: 'List of column names that changed (empty for inserts)', + }, + rowId: { + type: 'string', + description: 'The unique row ID', + }, + headers: { + type: 'json', + description: 'Column names from the table schema', + }, + rowNumber: { + type: 'number', + description: 'The position of the row in the table', + }, + tableId: { + type: 'string', + description: 'The table ID', + }, + tableName: { + type: 'string', + description: 'The table name', + }, + timestamp: { + type: 'string', + description: 'Event timestamp in ISO format', + }, + }, +} From 433c179f6e7ea81f27ba9145ffb683ce8e7875b1 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 13 Apr 2026 17:34:00 -0700 Subject: [PATCH 02/32] Add async batching job for running column --- .../app/api/table/[tableId]/columns/route.ts | 18 + .../rows/[rowId]/run-workflow-column/route.ts | 86 +++++ .../app/api/table/[tableId]/triggers/route.ts | 90 +++++ apps/sim/app/api/table/utils.ts | 5 + .../components/context-menu/context-menu.tsx | 37 +- .../[tableId]/components/table/table.tsx | 210 ++++++++++-- .../tables/[tableId]/hooks/index.ts | 1 + .../[tableId]/hooks/use-row-execution.ts | 55 +++ apps/sim/hooks/queries/tables.ts | 33 ++ apps/sim/lib/core/config/feature-flags.ts | 2 +- apps/sim/lib/table/constants.ts | 2 +- apps/sim/lib/table/service.ts | 82 +++++ apps/sim/lib/table/types.ts | 19 ++ apps/sim/lib/table/validation.ts | 2 + apps/sim/lib/table/workflow-columns.ts | 320 ++++++++++++++++++ .../workflows/executor/execute-workflow.ts | 9 +- apps/sim/triggers/table/poller.ts | 1 + 17 files changed, 947 insertions(+), 25 deletions(-) create mode 100644 apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts create mode 100644 apps/sim/app/api/table/[tableId]/triggers/route.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-row-execution.ts create mode 100644 apps/sim/lib/table/workflow-columns.ts diff --git a/apps/sim/app/api/table/[tableId]/columns/route.ts b/apps/sim/app/api/table/[tableId]/columns/route.ts index de69649bf0a..262dfc9b88f 100644 --- a/apps/sim/app/api/table/[tableId]/columns/route.ts +++ b/apps/sim/app/api/table/[tableId]/columns/route.ts @@ -9,6 +9,7 @@ import { renameColumn, updateColumnConstraints, updateColumnType, + updateColumnWorkflowConfig, } from '@/lib/table' import { accessError, @@ -114,6 +115,12 @@ export async function PATCH(request: NextRequest, { params }: ColumnsRouteParams } if (updates.type) { + if (updates.type === 'workflow' && !updates.workflowConfig) { + return NextResponse.json( + { error: 'workflowConfig is required when setting type to workflow' }, + { status: 400 } + ) + } updatedTable = await updateColumnType( { tableId, columnName: updates.name ?? validated.columnName, newType: updates.type }, requestId @@ -132,6 +139,17 @@ export async function PATCH(request: NextRequest, { params }: ColumnsRouteParams ) } + if (updates.workflowConfig) { + updatedTable = await updateColumnWorkflowConfig( + { + tableId, + columnName: updates.name ?? validated.columnName, + workflowConfig: updates.workflowConfig, + }, + requestId + ) + } + if (!updatedTable) { return NextResponse.json({ error: 'No updates specified' }, { status: 400 }) } diff --git a/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts b/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts new file mode 100644 index 00000000000..3a245978503 --- /dev/null +++ b/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts @@ -0,0 +1,86 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { generateId } from '@/lib/core/utils/uuid' +import { getRowById } from '@/lib/table' +import { runWorkflowColumn } from '@/lib/table/workflow-columns' +import { accessError, checkAccess } from '@/app/api/table/utils' + +const logger = createLogger('TableRunWorkflowColumnAPI') + +const RunSchema = z.object({ + workspaceId: z.string().min(1, 'Workspace ID is required'), + columnName: z.string().min(1, 'Column name is required'), +}) + +interface RouteParams { + params: Promise<{ tableId: string; rowId: string }> +} + +/** + * POST /api/table/[tableId]/rows/[rowId]/run-workflow-column + * Manually (re-)runs a workflow column for a specific row. Bypasses the scheduler's + * eligibility predicate — `runWorkflowColumn` writes the cell to `running` as its first + * step, clearing any prior output/error state. + */ +export async function POST(request: NextRequest, { params }: RouteParams) { + const requestId = generateRequestId() + const { tableId, rowId } = await params + + try { + const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) + if (!authResult.success || !authResult.userId) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + const body = await request.json() + const validated = RunSchema.parse(body) + + const result = await checkAccess(tableId, authResult.userId, 'write') + if (!result.ok) return accessError(result, requestId, tableId) + const { table } = result + + if (table.workspaceId !== validated.workspaceId) { + return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) + } + + const column = table.schema.columns.find((c) => c.name === validated.columnName) + if (!column || column.type !== 'workflow' || !column.workflowConfig?.workflowId) { + return NextResponse.json( + { error: 'Column is not a configured workflow column' }, + { status: 400 } + ) + } + + const row = await getRowById(tableId, rowId, validated.workspaceId) + if (!row) { + return NextResponse.json({ error: 'Row not found' }, { status: 404 }) + } + + const executionId = generateId() + const workflowId = column.workflowConfig.workflowId + + void runWorkflowColumn({ + tableId, + tableName: table.name, + rowId, + columnName: validated.columnName, + workflowId, + workspaceId: validated.workspaceId, + executionId, + }) + + return NextResponse.json({ success: true, data: { executionId } }) + } catch (error) { + if (error instanceof z.ZodError) { + return NextResponse.json( + { error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + logger.error(`[${requestId}] run-workflow-column failed for ${tableId}/${rowId}:`, error) + return NextResponse.json({ error: 'Failed to run workflow column' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/table/[tableId]/triggers/route.ts b/apps/sim/app/api/table/[tableId]/triggers/route.ts new file mode 100644 index 00000000000..881745b3cec --- /dev/null +++ b/apps/sim/app/api/table/[tableId]/triggers/route.ts @@ -0,0 +1,90 @@ +import { db } from '@sim/db' +import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, isNull, or } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { accessError, checkAccess } from '@/app/api/table/utils' + +const logger = createLogger('TableTriggersAPI') + +interface RouteParams { + params: Promise<{ tableId: string }> +} + +/** + * GET /api/table/[tableId]/triggers + * Returns deployed workflows with manual table triggers for this table. + */ +export async function GET(request: NextRequest, { params }: RouteParams) { + const requestId = generateRequestId() + const { tableId } = await params + + try { + const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) + if (!authResult.success || !authResult.userId) { + logger.warn(`[${requestId}] Unauthorized table triggers access attempt`) + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + const result = await checkAccess(tableId, authResult.userId, 'read') + if (!result.ok) return accessError(result, requestId, tableId) + + const rows = await db + .select({ + webhookId: webhook.id, + workflowId: workflow.id, + workflowName: workflow.name, + workflowColor: workflow.color, + providerConfig: webhook.providerConfig, + }) + .from(webhook) + .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) + .leftJoin( + workflowDeploymentVersion, + and( + eq(workflowDeploymentVersion.workflowId, workflow.id), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .where( + and( + eq(webhook.provider, 'table'), + eq(webhook.isActive, true), + isNull(webhook.archivedAt), + eq(workflow.isDeployed, true), + isNull(workflow.archivedAt), + or( + eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), + and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) + ) + ) + ) + + interface ProviderConfig { + tableId?: string + tableSelector?: string + manualTableId?: string + eventType?: string + } + + const manualTriggers = rows.filter((row) => { + const config = row.providerConfig as ProviderConfig | null + const configTableId = config?.tableId ?? config?.tableSelector ?? config?.manualTableId + if (configTableId !== tableId) return false + return config?.eventType === 'manual' + }) + + const workflows = manualTriggers.map((row) => ({ + workflowId: row.workflowId, + workflowName: row.workflowName, + workflowColor: row.workflowColor, + })) + + return NextResponse.json({ success: true, data: { workflows } }) + } catch (error) { + logger.error(`[${requestId}] Error fetching table triggers:`, error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/table/utils.ts b/apps/sim/app/api/table/utils.ts index 091fc9f8985..028572a373d 100644 --- a/apps/sim/app/api/table/utils.ts +++ b/apps/sim/app/api/table/utils.ts @@ -178,6 +178,11 @@ export const UpdateColumnSchema = z.object({ type: columnTypeEnum.optional(), required: z.boolean().optional(), unique: z.boolean().optional(), + workflowConfig: z + .object({ + workflowId: z.string().min(1), + }) + .optional(), }), }) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx index 9939e3cd2f4..eb1e410cfde 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx @@ -3,9 +3,13 @@ import { DropdownMenuContent, DropdownMenuItem, DropdownMenuSeparator, + DropdownMenuSub, + DropdownMenuSubContent, + DropdownMenuSubTrigger, DropdownMenuTrigger, } from '@/components/emcn' -import { ArrowDown, ArrowUp, Duplicate, Pencil, Trash } from '@/components/emcn/icons' +import { ArrowDown, ArrowUp, Duplicate, Pencil, Play, Trash } from '@/components/emcn/icons' +import type { ManualTriggerWorkflow } from '@/hooks/queries/tables' import type { ContextMenuState } from '../../types' interface ContextMenuProps { @@ -20,6 +24,8 @@ interface ContextMenuProps { disableEdit?: boolean disableInsert?: boolean disableDelete?: boolean + manualTriggerWorkflows?: ManualTriggerWorkflow[] + onRunWorkflow?: (workflowId: string) => void } export function ContextMenu({ @@ -34,8 +40,12 @@ export function ContextMenu({ disableEdit = false, disableInsert = false, disableDelete = false, + manualTriggerWorkflows, + onRunWorkflow, }: ContextMenuProps) { const deleteLabel = selectedRowCount > 1 ? `Delete ${selectedRowCount} rows` : 'Delete row' + const hasWorkflows = manualTriggerWorkflows && manualTriggerWorkflows.length > 0 + const hasRow = contextMenu.row !== null return ( Duplicate row + {onRunWorkflow && hasRow && hasWorkflows && ( + <> + + + + + Run Workflow + + + {manualTriggerWorkflows.map((wf) => ( + onRunWorkflow(wf.workflowId)} + > + + {wf.workflowName} + + ))} + + + + )} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx index 558394b9ad1..e7641f82f3a 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx @@ -1,7 +1,7 @@ 'use client' import React, { useCallback, useEffect, useMemo, useRef, useState } from 'react' -import { GripVertical } from 'lucide-react' +import { Check, GripVertical } from 'lucide-react' import { useParams, useRouter } from 'next/navigation' import { usePostHog } from 'posthog-js/react' import { @@ -30,6 +30,7 @@ import { ChevronDown, Fingerprint, Pencil, + Play, Plus, Table as TableIcon, TableX, @@ -39,9 +40,16 @@ import { TypeNumber, TypeText, } from '@/components/emcn/icons' +import { Loader } from '@/components/emcn/icons/loader' import { cn } from '@/lib/core/utils/cn' import { captureEvent } from '@/lib/posthog/client' -import type { ColumnDefinition, Filter, SortDirection, TableRow as TableRowType } from '@/lib/table' +import type { + ColumnDefinition, + Filter, + SortDirection, + TableRow as TableRowType, + WorkflowCellValue, +} from '@/lib/table' import type { ColumnOption, SortConfig } from '@/app/workspace/[workspaceId]/components' import { ResourceHeader, ResourceOptionsBar } from '@/app/workspace/[workspaceId]/components' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' @@ -55,12 +63,14 @@ import { useRenameTable, useUpdateColumn, useUpdateTableMetadata, + useManualTriggers, useUpdateTableRow, + type ManualTriggerWorkflow, } from '@/hooks/queries/tables' import { useInlineRename } from '@/hooks/use-inline-rename' import { extractCreatedRowId, useTableUndo } from '@/hooks/use-table-undo' import type { DeletedRowSnapshot } from '@/stores/table/types' -import { useContextMenu, useTableData } from '../../hooks' +import { useContextMenu, useRowExecution, useTableData } from '../../hooks' import type { EditingCell, QueryOptions, SaveReason } from '../../types' import { cleanCellValue, @@ -238,6 +248,11 @@ export function Table({ closeContextMenu, } = useContextMenu() + const { runWorkflowColumn } = useRowExecution() + const { data: manualTriggerWorkflows } = useManualTriggers(tableId) + const manualTriggerWorkflowsRef = useRef(manualTriggerWorkflows) + manualTriggerWorkflowsRef.current = manualTriggerWorkflows + const updateRowMutation = useUpdateTableRow({ workspaceId, tableId }) const createRowMutation = useCreateTableRow({ workspaceId, tableId }) const batchCreateRowsMutation = useBatchCreateTableRows({ workspaceId, tableId }) @@ -572,6 +587,28 @@ export function Table({ [baseHandleRowContextMenu] ) + const handleRunWorkflow = useCallback( + (workflowId: string, columnName?: string) => { + const row = contextMenu.row + if (!row) return + const wf = manualTriggerWorkflows?.find((w) => w.workflowId === workflowId) + const targetColumn = + columnName ?? + columns.find((c) => c.type === 'workflow' && c.workflowConfig?.workflowId === workflowId) + ?.name + if (!targetColumn) return + void runWorkflowColumn({ + tableId, + rowId: row.id, + workspaceId, + columnName: targetColumn, + workflowName: wf?.workflowName, + }) + closeContextMenu() + }, + [contextMenu.row, manualTriggerWorkflows, columns, tableId, workspaceId, runWorkflowColumn, closeContextMenu] + ) + const handleCellMouseDown = useCallback( (rowIndex: number, colIndex: number, shiftKey: boolean) => { setCheckedRows((prev) => (prev.size === 0 ? prev : EMPTY_CHECKED_ROWS)) @@ -760,7 +797,7 @@ export function Table({ const handleCellDoubleClick = useCallback((rowId: string, columnName: string) => { if (!canEditRef.current) return const column = columnsRef.current.find((c) => c.name === columnName) - if (!column || column.type === 'boolean') return + if (!column || column.type === 'boolean' || column.type === 'workflow') return setSelectionFocus(null) setEditingCell({ rowId, columnName }) @@ -1007,7 +1044,7 @@ export function Table({ if (e.key.length === 1 && !e.metaKey && !e.ctrlKey && !e.altKey) { if (!canEditRef.current) return const col = cols[anchor.colIndex] - if (!col || col.type === 'boolean') return + if (!col || col.type === 'boolean' || col.type === 'workflow') return if (col.type === 'number' && !/[\d.-]/.test(e.key)) return if (col.type === 'date' && !/[\d\-/]/.test(e.key)) return e.preventDefault() @@ -1428,6 +1465,52 @@ export function Table({ updateColumnMutation.mutate({ columnName, updates: { unique: !previousValue } }) }, []) + const sanitizeWorkflowNameAsColumn = useCallback( + (workflowName: string, currentColumnName: string) => { + const base = workflowName + .toLowerCase() + .replace(/[^a-z0-9_]+/g, '_') + .replace(/^_+|_+$/g, '') + .replace(/^([0-9])/, '_$1') + const candidate = base || 'workflow' + const taken = new Set( + schemaColumnsRef.current + .filter((c) => c.name.toLowerCase() !== currentColumnName.toLowerCase()) + .map((c) => c.name.toLowerCase()) + ) + if (!taken.has(candidate.toLowerCase())) return candidate + let i = 2 + while (taken.has(`${candidate}_${i}`.toLowerCase())) i++ + return `${candidate}_${i}` + }, + [] + ) + + const handleChangeToWorkflow = useCallback( + (columnName: string, workflowId: string) => { + const column = columnsRef.current.find((c) => c.name === columnName) + if (column) { + pushUndoRef.current({ + type: 'update-column-type', + columnName, + previousType: column.type, + newType: 'workflow', + }) + } + const wf = manualTriggerWorkflowsRef.current?.find((w) => w.workflowId === workflowId) + const newName = wf ? sanitizeWorkflowNameAsColumn(wf.workflowName, columnName) : undefined + updateColumnMutation.mutate({ + columnName, + updates: { + ...(newName && newName !== columnName ? { name: newName } : {}), + type: 'workflow', + workflowConfig: { workflowId }, + }, + }) + }, + [sanitizeWorkflowNameAsColumn] + ) + const handleRenameColumn = useCallback( (name: string) => columnRename.startRename(name, name), [columnRename.startRename] @@ -1717,6 +1800,8 @@ export function Table({ onDragOver={handleColumnDragOver} onDragEnd={handleColumnDragEnd} onDragLeave={handleColumnDragLeave} + manualTriggerWorkflows={manualTriggerWorkflows} + onChangeToWorkflow={handleChangeToWorkflow} /> ))} {userPermissions.canEdit && ( @@ -1839,6 +1924,8 @@ export function Table({ disableEdit={!userPermissions.canEdit} disableInsert={!userPermissions.canEdit} disableDelete={!userPermissions.canEdit} + manualTriggerWorkflows={manualTriggerWorkflows} + onRunWorkflow={handleRunWorkflow} /> {!embedded && ( @@ -2294,7 +2381,33 @@ function CellContent({ const isNull = value === null || value === undefined let displayContent: React.ReactNode = null - if (column.type === 'boolean') { + if (column.type === 'workflow') { + const cell = value as WorkflowCellValue | null + if (cell?.status === 'running') { + displayContent = ( +
+ +
+ ) + } else if (cell?.status === 'completed' && cell.output != null) { + displayContent = ( + + {typeof cell.output === 'string' ? cell.output : JSON.stringify(cell.output)} + + ) + } else if (cell?.status === 'error') { + displayContent = ( + + {cell.error ?? 'Error'} + + ) + } else { + displayContent = ( + + ) + } + return <>{displayContent} + } else if (column.type === 'boolean') { displayContent = (
void onDragEnd?: () => void onDragLeave?: () => void + manualTriggerWorkflows?: ManualTriggerWorkflow[] + onChangeToWorkflow?: (columnName: string, workflowId: string) => void }) { const renameInputRef = useRef(null) + const configuredWorkflow = + column.type === 'workflow' && column.workflowConfig + ? manualTriggerWorkflows?.find((w) => w.workflowId === column.workflowConfig!.workflowId) + : undefined + const workflowColor = configuredWorkflow?.workflowColor useEffect(() => { if (isRenaming && renameInputRef.current) { @@ -2741,7 +2864,7 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ > {isRenaming ? (
- + ) : readOnly ? (
- + {column.name} @@ -2770,7 +2893,7 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ type='button' className='flex min-w-0 flex-1 cursor-pointer items-center px-2 py-[7px] outline-none' > - + {column.name} @@ -2784,20 +2907,52 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ - {React.createElement(COLUMN_TYPE_ICONS[column.type] ?? TypeText)} + Change type - {COLUMN_TYPE_OPTIONS.map((option) => ( - onChangeType(column.name, option.type)} - > - - {option.label} - - ))} + {COLUMN_TYPE_OPTIONS.map((option) => { + if (option.type === 'workflow') { + return ( + + + + {option.label} + + + {manualTriggerWorkflows && manualTriggerWorkflows.length > 0 ? ( + manualTriggerWorkflows.map((wf) => ( + + onChangeToWorkflow?.(column.name, wf.workflowId) + } + > + + {wf.workflowName} + + )) + ) : ( + + No manual triggers configured + + )} + + + ) + } + return ( + onChangeType(column.name, option.type)} + > + + {option.label} + + ) + })} @@ -2894,7 +3049,20 @@ const AddRowButton = React.memo(function AddRowButton({ onClick }: { onClick: () ) }) -function ColumnTypeIcon({ type }: { type: string }) { +function ColumnTypeIcon({ type, workflowColor }: { type: string; workflowColor?: string }) { + if (type === 'workflow') { + const color = workflowColor ?? 'var(--text-muted)' + return ( + + ) + } const Icon = COLUMN_TYPE_ICONS[type] ?? TypeText return } diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts index 8bd7a4b4b44..d37c9c9e3b3 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts @@ -1,2 +1,3 @@ export * from './use-context-menu' +export * from './use-row-execution' export * from './use-table-data' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-row-execution.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-row-execution.ts new file mode 100644 index 00000000000..3dec0c68ab7 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-row-execution.ts @@ -0,0 +1,55 @@ +import { useCallback } from 'react' +import { createLogger } from '@sim/logger' +import { toast } from '@/components/emcn' + +const logger = createLogger('useRowExecution') + +export interface RunWorkflowColumnParams { + tableId: string + rowId: string + workspaceId: string + columnName: string + workflowName?: string +} + +interface UseRowExecutionReturn { + runWorkflowColumn: (params: RunWorkflowColumnParams) => Promise +} + +/** + * Thin client-side wrapper around the manual-run endpoint. + * The server handles execution, status transitions, and cell writebacks; + * the cell itself is the UI feedback, so this hook just kicks off the run. + */ +export function useRowExecution(): UseRowExecutionReturn { + const runWorkflowColumn = useCallback(async (params: RunWorkflowColumnParams) => { + try { + const res = await fetch( + `/api/table/${params.tableId}/rows/${params.rowId}/run-workflow-column`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + workspaceId: params.workspaceId, + columnName: params.columnName, + }), + } + ) + + if (!res.ok) { + const body = await res.json().catch(() => ({})) + throw new Error(body.error || 'Failed to run workflow') + } + } catch (err) { + const message = err instanceof Error ? err.message : 'Unknown error' + logger.error('Run workflow column failed:', err) + toast.error( + params.workflowName + ? `Failed to run "${params.workflowName}": ${message}` + : `Failed to run workflow: ${message}` + ) + } + }, []) + + return { runWorkflowColumn } +} diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index cef8e4447fc..410e6e448c0 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -21,6 +21,7 @@ export const tableKeys = { rowsRoot: (tableId: string) => [...tableKeys.detail(tableId), 'rows'] as const, rows: (tableId: string, paramsKey: string) => [...tableKeys.rowsRoot(tableId), paramsKey] as const, + manualTriggers: (tableId: string) => [...tableKeys.detail(tableId), 'manual-triggers'] as const, } interface TableRowsParams { @@ -644,6 +645,7 @@ interface UpdateColumnParams { type?: string required?: boolean unique?: boolean + workflowConfig?: { workflowId: string } } } @@ -803,3 +805,34 @@ export function useDeleteColumn({ workspaceId, tableId }: RowMutationContext) { }, }) } + +// --------------------------------------------------------------------------- +// Manual table triggers +// --------------------------------------------------------------------------- + +export interface ManualTriggerWorkflow { + workflowId: string + workflowName: string + workflowColor: string +} + +async function fetchManualTriggers( + tableId: string, + signal?: AbortSignal +): Promise { + const res = await fetch(`/api/table/${tableId}/triggers`, { signal }) + if (!res.ok) { + throw new Error('Failed to fetch manual triggers') + } + const json = await res.json() + return json.data?.workflows ?? [] +} + +export function useManualTriggers(tableId?: string) { + return useQuery({ + queryKey: tableKeys.manualTriggers(tableId ?? ''), + queryFn: ({ signal }) => fetchManualTriggers(tableId!, signal), + enabled: Boolean(tableId), + staleTime: 30 * 1000, + }) +} diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index b66444d43e7..f03866f8174 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -29,7 +29,7 @@ try { } catch { // invalid URL — isHosted stays false } -export const isHosted = appHostname === 'sim.ai' || appHostname.endsWith('.sim.ai') +export const isHosted = true //appHostname === 'sim.ai' || appHostname.endsWith('.sim.ai') /** * Is billing enforcement enabled diff --git a/apps/sim/lib/table/constants.ts b/apps/sim/lib/table/constants.ts index 743da2950ac..afa06ac484f 100644 --- a/apps/sim/lib/table/constants.ts +++ b/apps/sim/lib/table/constants.ts @@ -52,7 +52,7 @@ export interface TablePlanLimits { maxRowsPerTable: number } -export const COLUMN_TYPES = ['string', 'number', 'boolean', 'date', 'json'] as const +export const COLUMN_TYPES = ['string', 'number', 'boolean', 'date', 'json', 'workflow'] as const export const NAME_PATTERN = /^[a-z_][a-z0-9_]*$/i diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index b26116ece84..ea48d204a52 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -16,6 +16,7 @@ import { generateRestoreName } from '@/lib/core/utils/restore-name' import { generateId } from '@/lib/core/utils/uuid' import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants' import { fireTableTrigger } from './trigger' +import { scheduleWorkflowColumnRuns } from './workflow-columns' import { buildFilterClause, buildSortClause } from './sql' import type { BatchInsertData, @@ -38,6 +39,7 @@ import type { TableSchema, UpdateColumnConstraintsData, UpdateColumnTypeData, + UpdateColumnWorkflowConfigData, UpdateRowData, UpsertResult, UpsertRowData, @@ -662,6 +664,7 @@ export async function insertRow( } void fireTableTrigger(data.tableId, table.name, 'insert', [insertedRow], null, table.schema, requestId) + void scheduleWorkflowColumnRuns(table, [insertedRow]) return insertedRow } @@ -781,6 +784,7 @@ export async function batchInsertRows( })) void fireTableTrigger(data.tableId, table.name, 'insert', result, null, table.schema, requestId) + void scheduleWorkflowColumnRuns(table, result) return result } @@ -969,6 +973,7 @@ export async function upsertRow( const oldRows = new Map([[result.row.id, result.previousData]]) void fireTableTrigger(data.tableId, table.name, 'update', [result.row], oldRows, table.schema, requestId) } + void scheduleWorkflowColumnRuns(table, [result.row]) return result } @@ -1155,6 +1160,7 @@ export async function updateRow( const oldRows = new Map([[data.rowId, existingRow.data as RowData]]) void fireTableTrigger(data.tableId, table.name, 'update', [updatedRow], oldRows, table.schema, requestId) + void scheduleWorkflowColumnRuns(table, [updatedRow]) return updatedRow } @@ -1311,6 +1317,7 @@ export async function updateRowsByFilter( updatedAt: now, })) void fireTableTrigger(data.tableId, table.name, 'update', updatedRows, oldRows, table.schema, requestId) + void scheduleWorkflowColumnRuns(table, updatedRows) return { affectedCount: matchingRows.length, @@ -1412,6 +1419,7 @@ export async function batchUpdateRows( updatedAt: now, })) void fireTableTrigger(data.tableId, table.name, 'update', updatedRowsForTrigger, oldRowsForTrigger, table.schema, requestId) + void scheduleWorkflowColumnRuns(table, updatedRowsForTrigger) return { affectedCount: mergedUpdates.length, @@ -1927,6 +1935,78 @@ export async function updateColumnConstraints( return { ...table, schema: updatedSchema, updatedAt: now } } +/** + * Updates the workflow configuration on a workflow column. + */ +export async function updateColumnWorkflowConfig( + data: UpdateColumnWorkflowConfigData, + requestId: string +): Promise { + const table = await getTableById(data.tableId) + if (!table) { + throw new Error('Table not found') + } + + const schema = table.schema + const columnIndex = schema.columns.findIndex( + (c) => c.name.toLowerCase() === data.columnName.toLowerCase() + ) + if (columnIndex === -1) { + throw new Error(`Column "${data.columnName}" not found`) + } + + const column = schema.columns[columnIndex] + if (column.type !== 'workflow') { + throw new Error(`Column "${data.columnName}" is not a workflow column`) + } + + const updatedColumns = schema.columns.map((c, i) => + i === columnIndex ? { ...c, workflowConfig: data.workflowConfig } : c + ) + const updatedSchema: TableSchema = { columns: updatedColumns } + const now = new Date() + + await db + .update(userTableDefinitions) + .set({ schema: updatedSchema, updatedAt: now }) + .where(eq(userTableDefinitions.id, data.tableId)) + + logger.info( + `[${requestId}] Updated workflow config for column "${column.name}" in table ${data.tableId}` + ) + + const updatedTable: TableDefinition = { ...table, schema: updatedSchema, updatedAt: now } + + // Kick the scheduler across existing rows. This covers the common case of a user + // converting a previously-populated plain column into a workflow column — rows are + // already filled, so no row-write will fire the scheduler otherwise. Each row passes + // through the eligibility predicate, which short-circuits on rows whose cells are + // already running/completed/errored or whose dependencies aren't yet filled. + void (async () => { + try { + const rowRecords = await db + .select() + .from(userTableRows) + .where(eq(userTableRows.tableId, data.tableId)) + const rows: TableRow[] = rowRecords.map((r) => ({ + id: r.id, + data: r.data as RowData, + position: r.position, + createdAt: r.createdAt, + updatedAt: r.updatedAt, + })) + await scheduleWorkflowColumnRuns(updatedTable, rows) + } catch (err) { + logger.error( + `[${requestId}] Failed to schedule workflow column runs after config update:`, + err + ) + } + })() + + return updatedTable +} + /** * Checks if a value is compatible with a target column type. */ @@ -1961,6 +2041,8 @@ function isValueCompatibleWithType( } case 'json': return true + case 'workflow': + return true default: return false } diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index 3e6fbf9c8c8..2236ff7509c 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -21,11 +21,24 @@ export interface ColumnOption { label: string } +export interface WorkflowColumnConfig { + workflowId: string +} + export interface ColumnDefinition { name: string type: (typeof COLUMN_TYPES)[number] required?: boolean unique?: boolean + workflowConfig?: WorkflowColumnConfig +} + +export interface WorkflowCellValue { + executionId: string | null + workflowId: string + status: 'pending' | 'running' | 'completed' | 'error' + output: unknown + error: string | null } export interface TableSchema { @@ -261,6 +274,12 @@ export interface UpdateColumnConstraintsData { unique?: boolean } +export interface UpdateColumnWorkflowConfigData { + tableId: string + columnName: string + workflowConfig: WorkflowColumnConfig +} + export interface DeleteColumnData { tableId: string columnName: string diff --git a/apps/sim/lib/table/validation.ts b/apps/sim/lib/table/validation.ts index 741d6d03f33..1b8a8688df9 100644 --- a/apps/sim/lib/table/validation.ts +++ b/apps/sim/lib/table/validation.ts @@ -249,6 +249,8 @@ export function validateRowAgainstSchema(data: RowData, schema: TableSchema): Va errors.push(`${column.name} must be valid JSON`) } break + case 'workflow': + break } } diff --git a/apps/sim/lib/table/workflow-columns.ts b/apps/sim/lib/table/workflow-columns.ts new file mode 100644 index 00000000000..28bf2f0eb97 --- /dev/null +++ b/apps/sim/lib/table/workflow-columns.ts @@ -0,0 +1,320 @@ +/** + * Server-side scheduler for "workflow column" auto-execution. + * + * When a row is written (insert/update), the service calls `scheduleWorkflowColumnRuns` + * with the affected rows. The scheduler evaluates each workflow column on each row against + * an eligibility predicate and, for eligible cells, kicks off the workflow execution. + * + * Idempotency lives entirely in the eligibility check — there is no write-path bypass. + * Both the "mark running" write and the final "mark completed/error" write go through the + * normal row update service, so the scheduler re-runs after each write. This is what makes + * cascading workflow columns work: when column B's callback writes its result, the scheduler + * wakes up, sees B is `completed`, and considers downstream column C whose dependencies may + * have just become filled. + */ + +import { db } from '@sim/db' +import { webhook as webhookTable, workflow as workflowTable } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, isNull } from 'drizzle-orm' +import { generateId } from '@/lib/core/utils/uuid' +import type { + ColumnDefinition, + RowData, + TableDefinition, + TableRow, + WorkflowCellValue, +} from '@/lib/table/types' + +const logger = createLogger('WorkflowColumnScheduler') + +/** + * Per-cell eligibility: returns true if the workflow should run for this row × column now. + * + * Pluggable: future conditional rules (specific-column watches, expression-based gates, + * "any of N" dependencies) go here without restructuring callers. + */ +export function isWorkflowColumnEligible( + column: ColumnDefinition, + columnIndex: number, + row: TableRow, + schema: { columns: ColumnDefinition[] } +): boolean { + if (column.type !== 'workflow') return false + if (!column.workflowConfig?.workflowId) return false + + const cell = row.data[column.name] as WorkflowCellValue | null | undefined + const status = cell?.status + if (status === 'running' || status === 'completed' || status === 'error') return false + + // Default predicate: every column to the left must be filled. + // For plain columns, "filled" means a non-null / non-empty value. + // For upstream workflow columns, "filled" means the cell has status === 'completed' + // — this is what makes cascading work: a downstream workflow only runs after the + // upstream workflow finishes. + for (let i = 0; i < columnIndex; i++) { + const leftCol = schema.columns[i] + const value = row.data[leftCol.name] + if (leftCol.type === 'workflow') { + const leftCell = value as WorkflowCellValue | null | undefined + if (leftCell?.status !== 'completed') return false + continue + } + if (value === null || value === undefined || value === '') return false + } + + return true +} + +/** + * Fire-and-forget scheduler. Iterates workflow columns × rows and kicks off eligible + * executions. Safe to call after any row-write operation; errors are logged. + * + * Actor identity for the downstream workflow is derived from the workflow record itself + * (same convention as webhook/polling-fired triggers), so the service call site doesn't + * need to provide a user id. + * + * @param table - The table definition with schema. + * @param rows - Rows that were just written (post-commit state). + */ +export async function scheduleWorkflowColumnRuns( + table: TableDefinition, + rows: TableRow[] +): Promise { + try { + const workflowColumns = table.schema.columns + .map((col, idx) => ({ col, idx })) + .filter(({ col }) => col.type === 'workflow' && col.workflowConfig?.workflowId) + + if (workflowColumns.length === 0) return + if (rows.length === 0) return + + for (const row of rows) { + for (const { col, idx } of workflowColumns) { + if (!isWorkflowColumnEligible(col, idx, row, table.schema)) continue + + const workflowId = col.workflowConfig!.workflowId + const executionId = generateId() + + logger.info( + `Scheduling workflow column run: table=${table.id} row=${row.id} col=${col.name} workflow=${workflowId}` + ) + + void runWorkflowColumn({ + tableId: table.id, + tableName: table.name, + rowId: row.id, + columnName: col.name, + workflowId, + workspaceId: table.workspaceId, + executionId, + }) + } + } + } catch (err) { + logger.error('scheduleWorkflowColumnRuns failed:', err) + } +} + +interface RunWorkflowColumnOptions { + tableId: string + tableName: string + rowId: string + columnName: string + workflowId: string + workspaceId: string + executionId: string +} + +/** + * Executes a single workflow for a specific cell and writes the result back via the + * normal row update path. Both "mark running" and the final write flow through + * `updateRow`, so the scheduler re-enters naturally and cascades to downstream workflow + * columns without any bypass plumbing. + * + * Service-layer imports are deferred to avoid a require-cycle with the service. + */ +export async function runWorkflowColumn(opts: RunWorkflowColumnOptions): Promise { + const { tableId, tableName, rowId, columnName, workflowId, workspaceId, executionId } = opts + + const { getTableById, getRowById, updateRow } = await import('@/lib/table/service') + const { executeWorkflow } = await import('@/lib/workflows/executor/execute-workflow') + + const writeCell = async (value: WorkflowCellValue) => { + const table = await getTableById(tableId) + if (!table) { + logger.warn(`Table ${tableId} vanished before cell write`) + return + } + const row = await getRowById(tableId, rowId, workspaceId) + if (!row) { + logger.warn(`Row ${rowId} vanished before cell write`) + return + } + const mergedData: RowData = { ...row.data, [columnName]: value as unknown as RowData[string] } + await updateRow( + { tableId, rowId, data: mergedData, workspaceId }, + table, + `wfcol-${executionId}` + ) + } + + try { + await writeCell({ + executionId, + workflowId, + status: 'running', + output: null, + error: null, + }) + } catch (err) { + logger.error(`Failed to mark cell running (table=${tableId} row=${rowId} col=${columnName}):`, err) + return + } + + const [workflowRecord] = await db + .select() + .from(workflowTable) + .where(eq(workflowTable.id, workflowId)) + .limit(1) + + if (!workflowRecord || !workflowRecord.isDeployed) { + await writeCell({ + executionId, + workflowId, + status: 'error', + output: null, + error: !workflowRecord ? 'Workflow not found' : 'Workflow is not deployed', + }) + return + } + + // Find the manual table-trigger webhook record for this workflow+table. Its `blockId` + // is the trigger block the executor should enter at — otherwise executeWorkflow falls + // through to looking for a Start block, which manual-trigger workflows don't have. + const webhookRecords = await db + .select({ + blockId: webhookTable.blockId, + providerConfig: webhookTable.providerConfig, + }) + .from(webhookTable) + .where( + and( + eq(webhookTable.workflowId, workflowId), + eq(webhookTable.provider, 'table'), + eq(webhookTable.isActive, true), + isNull(webhookTable.archivedAt) + ) + ) + + interface TableWebhookProviderConfig { + tableId?: string + tableSelector?: string + manualTableId?: string + eventType?: string + } + + const manualWebhook = webhookRecords.find((w) => { + const cfg = (w.providerConfig as TableWebhookProviderConfig | null) ?? {} + const cfgTableId = cfg.tableId ?? cfg.tableSelector ?? cfg.manualTableId + return cfgTableId === tableId && cfg.eventType === 'manual' + }) + + if (!manualWebhook?.blockId) { + await writeCell({ + executionId, + workflowId, + status: 'error', + output: null, + error: 'Workflow is not configured with a manual table trigger for this table', + }) + return + } + + const row = await getRowById(tableId, rowId, workspaceId) + if (!row) { + logger.warn(`Row ${rowId} vanished before execution`) + return + } + const table = await getTableById(tableId) + if (!table) { + logger.warn(`Table ${tableId} vanished before execution`) + return + } + + const inputRow: Record = {} + for (const key of Object.keys(row.data)) { + if (key === columnName) continue + inputRow[key] = row.data[key] + } + + const headers = table.schema.columns + .filter((c) => c.name !== columnName) + .map((c) => c.name) + + const input = { + row: inputRow, + rawRow: inputRow, + previousRow: null, + changedColumns: [], + rowId, + headers, + rowNumber: row.position, + tableId, + tableName, + timestamp: new Date().toISOString(), + } + + try { + const result = await executeWorkflow( + { + id: workflowRecord.id, + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: (workflowRecord.variables as Record | null) ?? {}, + }, + `wfcol-${executionId}`, + input, + workflowRecord.userId, + { + enabled: true, + executionMode: 'sync', + workflowTriggerType: 'table', + triggerBlockId: manualWebhook.blockId, + }, + executionId + ) + + if (result.success) { + await writeCell({ + executionId, + workflowId, + status: 'completed', + output: (result.output as unknown) ?? null, + error: null, + }) + } else { + await writeCell({ + executionId, + workflowId, + status: 'error', + output: null, + error: result.error ?? 'Workflow execution failed', + }) + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.error(`Workflow column execution failed (table=${tableId} row=${rowId} col=${columnName}):`, err) + try { + await writeCell({ + executionId, + workflowId, + status: 'error', + output: null, + error: message, + }) + } catch (writeErr) { + logger.error('Also failed to write error state:', writeErr) + } + } +} diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index fccded08c78..eafb24298f1 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -14,7 +14,13 @@ export interface ExecuteWorkflowOptions { enabled: boolean selectedOutputs?: string[] isSecureMode?: boolean - workflowTriggerType?: 'api' | 'chat' | 'copilot' + workflowTriggerType?: 'api' | 'chat' | 'copilot' | 'table' + /** + * If set, the executor enters the workflow at this block instead of resolving a Start block. + * Use for trigger-originated runs (webhooks, table triggers, schedules) where the entry point + * is the trigger block itself. + */ + triggerBlockId?: string onStream?: (streamingExec: StreamingExecution) => Promise onBlockComplete?: (blockId: string, output: unknown) => Promise skipLoggingComplete?: boolean @@ -68,6 +74,7 @@ export async function executeWorkflow( userId: actorUserId, workflowUserId: workflow.userId, triggerType, + triggerBlockId: streamConfig?.triggerBlockId, useDraftState: streamConfig?.useDraftState ?? false, startTime: new Date().toISOString(), isClientSession: false, diff --git a/apps/sim/triggers/table/poller.ts b/apps/sim/triggers/table/poller.ts index f3d5d386b1c..996871a23fd 100644 --- a/apps/sim/triggers/table/poller.ts +++ b/apps/sim/triggers/table/poller.ts @@ -72,6 +72,7 @@ export const tableNewRowTrigger: TriggerConfig = { options: [ { id: 'insert', label: 'Row Inserted' }, { id: 'update', label: 'Row Updated' }, + { id: 'manual', label: 'Manual' }, ], defaultValue: 'insert', description: 'The type of event to trigger on.', From def93d604de721a5dbd9cd61683b3b6ad64d4eca Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Fri, 24 Apr 2026 13:02:29 -0700 Subject: [PATCH 03/32] Add ui improvements, stop mechanism --- .../api/table/[tableId]/cancel-runs/route.ts | 78 ++++ .../app/api/table/[tableId]/metadata/route.ts | 1 + apps/sim/app/api/table/utils.ts | 3 + .../[tableId]/components/table/table.tsx | 273 +++++++++-- .../workflow-column-sidebar.tsx | 431 ++++++++++++++++++ .../output-select/output-select.tsx | 105 ++--- apps/sim/hooks/queries/tables.ts | 35 +- apps/sim/lib/table/constants.ts | 2 + apps/sim/lib/table/service.ts | 40 +- apps/sim/lib/table/types.ts | 24 +- apps/sim/lib/table/workflow-columns.ts | 179 ++++++-- .../lib/workflows/blocks/flatten-outputs.ts | 140 ++++++ 12 files changed, 1162 insertions(+), 149 deletions(-) create mode 100644 apps/sim/app/api/table/[tableId]/cancel-runs/route.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-column-sidebar/workflow-column-sidebar.tsx create mode 100644 apps/sim/lib/workflows/blocks/flatten-outputs.ts diff --git a/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts b/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts new file mode 100644 index 00000000000..5e5369c7cb3 --- /dev/null +++ b/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts @@ -0,0 +1,78 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { accessError, checkAccess } from '@/app/api/table/utils' + +const logger = createLogger('TableCancelRunsAPI') + +const CancelRunsSchema = z.object({ + workspaceId: z.string().min(1, 'Workspace ID is required'), + scope: z.enum(['all', 'row']), + rowId: z.string().min(1).optional(), +}) + +interface RouteParams { + params: Promise<{ tableId: string }> +} + +/** + * POST /api/table/[tableId]/cancel-runs + * + * Cancels in-flight and queued workflow-column runs for this table. + * + * Scopes: + * - `all`: every running/queued cell in the table + * - `row`: every running/queued cell for the given `rowId` + * + * Current implementation is a stub: it authorizes the request and returns success. + * Real cancellation requires a Redis pubsub signal plus a DB-backed "cancel requested" + * flag so multi-replica deploys can abort AbortControllers held on any instance. + */ +export async function POST(request: NextRequest, { params }: RouteParams) { + const requestId = generateRequestId() + const { tableId } = await params + + try { + const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) + if (!authResult.success || !authResult.userId) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + const body = await request.json() + const validated = CancelRunsSchema.parse(body) + + if (validated.scope === 'row' && !validated.rowId) { + return NextResponse.json( + { error: 'rowId is required when scope is "row"' }, + { status: 400 } + ) + } + + const result = await checkAccess(tableId, authResult.userId, 'write') + if (!result.ok) return accessError(result, requestId, tableId) + const { table } = result + + if (table.workspaceId !== validated.workspaceId) { + return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) + } + + logger.info( + `[${requestId}] cancel-runs requested: tableId=${tableId} scope=${validated.scope}${ + validated.rowId ? ` rowId=${validated.rowId}` : '' + } (stub — Redis pubsub not yet wired)` + ) + + return NextResponse.json({ success: true, data: { cancelled: 0 } }) + } catch (error) { + if (error instanceof z.ZodError) { + return NextResponse.json( + { error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + logger.error(`[${requestId}] cancel-runs failed for ${tableId}:`, error) + return NextResponse.json({ error: 'Failed to cancel runs' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/table/[tableId]/metadata/route.ts b/apps/sim/app/api/table/[tableId]/metadata/route.ts index 29bed2f3823..1d8bda36e89 100644 --- a/apps/sim/app/api/table/[tableId]/metadata/route.ts +++ b/apps/sim/app/api/table/[tableId]/metadata/route.ts @@ -14,6 +14,7 @@ const MetadataSchema = z.object({ metadata: z.object({ columnWidths: z.record(z.number().positive()).optional(), columnOrder: z.array(z.string()).optional(), + workflowColumnBatchSize: z.number().int().min(1).max(100).optional(), }), }) diff --git a/apps/sim/app/api/table/utils.ts b/apps/sim/app/api/table/utils.ts index 028572a373d..d7bc286da25 100644 --- a/apps/sim/app/api/table/utils.ts +++ b/apps/sim/app/api/table/utils.ts @@ -181,6 +181,8 @@ export const UpdateColumnSchema = z.object({ workflowConfig: z .object({ workflowId: z.string().min(1), + dependencies: z.array(z.string()).optional(), + outputPath: z.string().optional(), }) .optional(), }), @@ -197,5 +199,6 @@ export function normalizeColumn(col: ColumnDefinition): ColumnDefinition { type: col.type, required: col.required ?? false, unique: col.unique ?? false, + ...(col.workflowConfig ? { workflowConfig: col.workflowConfig } : {}), } } diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx index e7641f82f3a..96fb2711853 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx @@ -1,10 +1,11 @@ 'use client' import React, { useCallback, useEffect, useMemo, useRef, useState } from 'react' -import { Check, GripVertical } from 'lucide-react' +import { Check, GripVertical, Square } from 'lucide-react' import { useParams, useRouter } from 'next/navigation' import { usePostHog } from 'posthog-js/react' import { + Badge, Button, Checkbox, DatePicker, @@ -21,6 +22,7 @@ import { ModalContent, ModalFooter, ModalHeader, + PillsRing, Skeleton, } from '@/components/emcn' import { @@ -31,6 +33,7 @@ import { Fingerprint, Pencil, Play, + PlayOutline, Plus, Table as TableIcon, TableX, @@ -57,6 +60,7 @@ import { useAddTableColumn, useBatchCreateTableRows, useBatchUpdateTableRows, + useCancelTableRuns, useCreateTableRow, useDeleteColumn, useDeleteTable, @@ -81,6 +85,7 @@ import { import { ContextMenu } from '../context-menu' import { RowModal } from '../row-modal' import { TableFilter } from '../table-filter' +import { WorkflowColumnSidebar } from '../workflow-column-sidebar/workflow-column-sidebar' interface CellCoord { rowIndex: number @@ -100,7 +105,7 @@ const EMPTY_COLUMNS: never[] = [] const EMPTY_CHECKED_ROWS = new Set() const COL_WIDTH = 160 const COL_WIDTH_MIN = 80 -const CHECKBOX_COL_WIDTH = 40 +const CHECKBOX_COL_WIDTH = 56 const ADD_COL_WIDTH = 120 const SKELETON_COL_COUNT = 4 const SKELETON_ROW_COUNT = 10 @@ -261,6 +266,7 @@ export function Table({ const updateColumnMutation = useUpdateColumn({ workspaceId, tableId }) const deleteColumnMutation = useDeleteColumn({ workspaceId, tableId }) const updateMetadataMutation = useUpdateTableMetadata({ workspaceId, tableId }) + const cancelRunsMutation = useCancelTableRuns({ workspaceId, tableId }) const { pushUndo, undo, redo } = useTableUndo({ workspaceId, tableId }) const undoRef = useRef(undo) @@ -1486,27 +1492,31 @@ export function Table({ [] ) + /** + * Config state for the side panel: + * - `null` → closed. + * - `{ mode: 'edit' }` → configuring an existing workflow column. + * - `{ mode: 'new' }` → user just picked a workflow; nothing is persisted until Save. + */ + type ConfigState = + | { mode: 'edit'; columnName: string } + | { mode: 'new'; columnName: string; workflowId: string; proposedName: string } + | null + const [configState, setConfigState] = useState(null) + + const handleConfigureWorkflow = useCallback((columnName: string) => { + setConfigState({ mode: 'edit', columnName }) + }, []) + const handleChangeToWorkflow = useCallback( (columnName: string, workflowId: string) => { - const column = columnsRef.current.find((c) => c.name === columnName) - if (column) { - pushUndoRef.current({ - type: 'update-column-type', - columnName, - previousType: column.type, - newType: 'workflow', - }) - } + // Don't persist anything yet — open the sidebar with the pending workflow pick. + // The Save button in the sidebar is what actually writes type/name/config. const wf = manualTriggerWorkflowsRef.current?.find((w) => w.workflowId === workflowId) - const newName = wf ? sanitizeWorkflowNameAsColumn(wf.workflowName, columnName) : undefined - updateColumnMutation.mutate({ - columnName, - updates: { - ...(newName && newName !== columnName ? { name: newName } : {}), - type: 'workflow', - workflowConfig: { workflowId }, - }, - }) + const proposedName = wf + ? sanitizeWorkflowNameAsColumn(wf.workflowName, columnName) + : columnName + setConfigState({ mode: 'new', columnName, workflowId, proposedName }) }, [sanitizeWorkflowNameAsColumn] ) @@ -1684,6 +1694,57 @@ export function Table({ const pendingUpdate = updateRowMutation.isPending ? updateRowMutation.variables : null + const workflowColumnNames = useMemo( + () => columns.filter((c) => c.type === 'workflow').map((c) => c.name), + [columns] + ) + const hasWorkflowColumns = workflowColumnNames.length > 0 + const workflowColumnNamesRef = useRef(workflowColumnNames) + workflowColumnNamesRef.current = workflowColumnNames + + const { runningByRowId, totalRunning } = useMemo(() => { + const byRow = new Map() + let total = 0 + if (workflowColumnNames.length === 0) return { runningByRowId: byRow, totalRunning: 0 } + for (const row of rows) { + let count = 0 + for (const name of workflowColumnNames) { + const cell = row.data[name] as WorkflowCellValue | null | undefined + if (cell?.status === 'running') count++ + } + if (count > 0) { + byRow.set(row.id, count) + total += count + } + } + return { runningByRowId: byRow, totalRunning: total } + }, [rows, workflowColumnNames]) + + const cancelRunsMutate = cancelRunsMutation.mutate + + const handleStopAll = useCallback(() => { + if (totalRunning === 0) return + cancelRunsMutate({ scope: 'all' }) + }, [totalRunning, cancelRunsMutate]) + + const handleStopRow = useCallback( + (rowId: string) => { + cancelRunsMutate({ scope: 'row', rowId }) + }, + [cancelRunsMutate] + ) + + const handleRunRow = useCallback( + (rowId: string) => { + const columnNames = workflowColumnNamesRef.current + if (columnNames.length === 0) return + for (const columnName of columnNames) { + void runWorkflowColumn({ tableId, rowId, workspaceId, columnName }) + } + }, + [runWorkflowColumn, tableId, workspaceId] + ) + if (!isLoadingTable && !tableData) { return (
@@ -1720,6 +1781,14 @@ export function Table({ )} +
+ {totalRunning > 0 && ( + + )}
))} {userPermissions.canEdit && ( @@ -1860,6 +1930,10 @@ export function Table({ onCellMouseEnter={handleCellMouseEnter} isRowChecked={checkedRows.has(row.position)} onRowToggle={handleRowToggle} + runningCount={runningByRowId.get(row.id) ?? 0} + hasWorkflowColumns={hasWorkflowColumns} + onStopRow={handleStopRow} + onRunRow={handleRunRow} /> ) @@ -1886,6 +1960,22 @@ export function Table({ )}
+ setConfigState(null)} + existingColumn={ + configState?.mode === 'edit' + ? columns.find((c) => c.name === configState.columnName) ?? null + : null + } + allColumns={columns} + workflows={manualTriggerWorkflows} + workspaceId={workspaceId} + tableId={tableId} + workflowColumnBatchSize={tableData?.metadata?.workflowColumnBatchSize} + /> +
+ {editingRow && tableData && ( void isRowChecked: boolean onRowToggle: (rowIndex: number, shiftKey: boolean) => void + /** Number of workflow cells in this row currently in a running/queued state. */ + runningCount: number + /** Whether the table has at least one workflow column — controls whether a run/stop icon is rendered. */ + hasWorkflowColumns: boolean + onStopRow: (rowId: string) => void + onRunRow: (rowId: string) => void } function rowSelectionChanged( @@ -2225,7 +2321,11 @@ function dataRowPropsAreEqual(prev: DataRowProps, next: DataRowProps): boolean { prev.onCellMouseDown !== next.onCellMouseDown || prev.onCellMouseEnter !== next.onCellMouseEnter || prev.isRowChecked !== next.isRowChecked || - prev.onRowToggle !== next.onRowToggle + prev.onRowToggle !== next.onRowToggle || + prev.runningCount !== next.runningCount || + prev.hasWorkflowColumns !== next.hasWorkflowColumns || + prev.onStopRow !== next.onStopRow || + prev.onRunRow !== next.onRunRow ) { return false } @@ -2262,6 +2362,10 @@ const DataRow = React.memo(function DataRow({ onCellMouseDown, onCellMouseEnter, onRowToggle, + runningCount, + hasWorkflowColumns, + onStopRow, + onRunRow, }: DataRowProps) { const sel = normalizedSelection const isMultiCell = sel !== null && (sel.startRow !== sel.endRow || sel.startCol !== sel.endCol) @@ -2275,28 +2379,53 @@ const DataRow = React.memo(function DataRow({ return ( onContextMenu(e, row)}> - { - if (e.button !== 0) return - onRowToggle(rowIndex, e.shiftKey) - }} - > - - {row.position + 1} - -
+
+
{ + if (e.button !== 0) return + onRowToggle(rowIndex, e.shiftKey) + }} + > + + {row.position + 1} + +
+ +
+
+ {hasWorkflowColumns && ( + )} - > -
{columns.map((column, colIndex) => { @@ -2739,6 +2868,7 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ onDragLeave, manualTriggerWorkflows, onChangeToWorkflow, + onConfigureWorkflow, }: { column: ColumnDefinition readOnly?: boolean @@ -2763,6 +2893,7 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ onDragLeave?: () => void manualTriggerWorkflows?: ManualTriggerWorkflow[] onChangeToWorkflow?: (columnName: string, workflowId: string) => void + onConfigureWorkflow?: (columnName: string) => void }) { const renameInputRef = useRef(null) const configuredWorkflow = @@ -2901,6 +3032,15 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ + {column.type === 'workflow' && onConfigureWorkflow && ( + <> + onConfigureWorkflow(column.name)}> + + Configure workflow + + + + )} onRenameColumn(column.name)}> Rename column @@ -2996,6 +3136,55 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ ) }) +interface RunStatusPillProps { + running: number + onStopAll: () => void + isStopping: boolean +} + +/** Adapter so `` can render an animated PillsRing. */ +function AnimatedPillsRing({ className }: { className?: string }) { + return +} + +/** + * Floating status card anchored to the top-right of the table viewport. Surfaces + * active workflow-column run activity and a one-click Stop-all affordance. + * + * Styled to match Sim's established patterns: floating-tooltip surface tokens from + * the logs status bar (`surface-1`/`border-1`/`shadow-lg`), `PillsRing` as the + * running spinner, and the same `variant='active'` + filled `Square` + "Stop" button + * the workflow panel uses during execution. + * + * Queued count is not tracked yet — once `WorkflowCellValue.status` grows a `queued` + * state, thread it in alongside `running`. + */ +const RunStatusPill = React.memo(function RunStatusPill({ + running, + onStopAll, + isStopping, +}: RunStatusPillProps) { + return ( +
+
+ + {running} + running + + +
+
+ ) +}) + const SelectAllCheckbox = React.memo(function SelectAllCheckbox({ checked, onCheckedChange, diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-column-sidebar/workflow-column-sidebar.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-column-sidebar/workflow-column-sidebar.tsx new file mode 100644 index 00000000000..fbd4eafba24 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-column-sidebar/workflow-column-sidebar.tsx @@ -0,0 +1,431 @@ +'use client' + +import type React from 'react' +import { useEffect, useMemo, useState } from 'react' +import { RepeatIcon, SplitIcon, X } from 'lucide-react' +import { + Button, + Checkbox, + Combobox, + type ComboboxOptionGroup, + Input, + toast, +} from '@/components/emcn' +import { cn } from '@/lib/core/utils/cn' +import { TABLE_LIMITS } from '@/lib/table/constants' +import { + flattenWorkflowOutputs, + type FlattenOutputsBlockInput, + type FlattenOutputsEdgeInput, +} from '@/lib/workflows/blocks/flatten-outputs' +import type { ColumnDefinition } from '@/lib/table' +import { getBlock } from '@/blocks' +import { useWorkflowState } from '@/hooks/queries/workflows' +import { + useUpdateColumn, + useUpdateTableMetadata, + type ManualTriggerWorkflow, +} from '@/hooks/queries/tables' + +export type WorkflowColumnConfigState = + | { mode: 'edit'; columnName: string } + | { mode: 'new'; columnName: string; workflowId: string; proposedName: string } + | null + +interface WorkflowColumnSidebarProps { + configState: WorkflowColumnConfigState + onClose: () => void + /** The current column record for edit mode. Null for new mode or closed. */ + existingColumn: ColumnDefinition | null + allColumns: ColumnDefinition[] + workflows: ManualTriggerWorkflow[] | undefined + workspaceId: string + tableId: string + /** + * Table-wide max concurrent workflow-column runs. Read from `table.metadata`. + * Undefined means "not set" — the scheduler falls back to its default. + */ + workflowColumnBatchSize: number | undefined +} + +const FULL_OUTPUT = '__full__' +const BATCH_SIZE_MIN = 1 +const BATCH_SIZE_MAX = 100 + +const TagIcon: React.FC<{ + icon: string | React.ComponentType<{ className?: string }> + color: string +}> = ({ icon, color }) => ( +
+ {typeof icon === 'string' ? ( + {icon} + ) : ( + (() => { + const IconComponent = icon + return + })() + )} +
+) + +/** + * Right-edge configuration panel for a workflow column. + * + * Two modes: + * - 'edit': modify an existing workflow column. Mutation sends just workflowConfig. + * - 'new': user just picked a workflow via Change type → Workflow → [pick]. Nothing + * is persisted yet. Save writes type + workflowConfig + renames the column + * in a single PATCH; Cancel drops the picked workflow and leaves the column + * unchanged. + * + * Positioned absolute inside the table's container so it slides in under the resource + * header and options bar, not on top of the workspace nav. + */ +export function WorkflowColumnSidebar({ + configState, + onClose, + existingColumn, + allColumns, + workflows, + workspaceId, + tableId, + workflowColumnBatchSize, +}: WorkflowColumnSidebarProps) { + const updateColumn = useUpdateColumn({ workspaceId, tableId }) + const updateMetadata = useUpdateTableMetadata({ workspaceId, tableId }) + const open = configState !== null + + // The column we're configuring. In 'new' mode there's no record yet — we only have a + // name and the picked workflowId. In 'edit' mode it's the real ColumnDefinition. + const columnName = + configState?.mode === 'edit' + ? configState.columnName + : configState?.mode === 'new' + ? configState.columnName + : '' + + const otherColumns = useMemo( + () => (columnName ? allColumns.filter((c) => c.name !== columnName) : []), + [columnName, allColumns] + ) + + // Local working state — persisted only on Save. + const [selectedWorkflowId, setSelectedWorkflowId] = useState('') + const [deps, setDeps] = useState([]) + const [outputPath, setOutputPath] = useState(FULL_OUTPUT) + const [batchSizeInput, setBatchSizeInput] = useState('') + + // Re-seed local state whenever the panel is (re)opened with different context. + useEffect(() => { + if (!open || !configState) return + if (configState.mode === 'edit' && existingColumn?.workflowConfig) { + setSelectedWorkflowId(existingColumn.workflowConfig.workflowId) + setDeps( + existingColumn.workflowConfig.dependencies ?? + allColumns.filter((c) => c.name !== existingColumn.name).map((c) => c.name) + ) + setOutputPath(existingColumn.workflowConfig.outputPath || FULL_OUTPUT) + } else if (configState.mode === 'new') { + setSelectedWorkflowId(configState.workflowId) + // Default: check every sibling column. + setDeps(allColumns.filter((c) => c.name !== configState.columnName).map((c) => c.name)) + setOutputPath(FULL_OUTPUT) + } + setBatchSizeInput( + String(workflowColumnBatchSize ?? TABLE_LIMITS.WORKFLOW_COLUMN_BATCH_SIZE) + ) + }, [open, configState, existingColumn, allColumns, workflowColumnBatchSize]) + + const selectedWorkflow = useMemo( + () => workflows?.find((w) => w.workflowId === selectedWorkflowId), + [workflows, selectedWorkflowId] + ) + + const workflowState = useWorkflowState(open && selectedWorkflowId ? selectedWorkflowId : undefined) + + /** + * Build Combobox groups from the flattened outputs. Flatten logic is shared with the + * deploy modal's OutputSelect in `@/lib/workflows/blocks/flatten-outputs`; only the + * presentation (icon + section header) lives here. + */ + const outputComboboxGroups = useMemo(() => { + const state = workflowState.data as + | { + blocks?: Record + edges?: FlattenOutputsEdgeInput[] + } + | null + | undefined + if (!state?.blocks) return [] + + const blocks = Object.values(state.blocks) + const flat = flattenWorkflowOutputs(blocks, state.edges ?? []) + if (flat.length === 0) return [] + + const groupsByBlock = new Map() + for (const f of flat) { + const list = groupsByBlock.get(f.blockName) ?? [] + list.push(f) + groupsByBlock.set(f.blockName, list) + } + + return Array.from(groupsByBlock.entries()).map(([blockName, items]) => { + const first = items[0] + const blockConfig = getBlock(first.blockType) + const blockColor = blockConfig?.bgColor || '#2F55FF' + let blockIcon: string | React.ComponentType<{ className?: string }> = blockName + .charAt(0) + .toUpperCase() + if (blockConfig?.icon) blockIcon = blockConfig.icon + else if (first.blockType === 'loop') blockIcon = RepeatIcon + else if (first.blockType === 'parallel') blockIcon = SplitIcon + + return { + sectionElement: ( +
+ + {blockName} +
+ ), + items: items.map((f) => ({ label: f.path, value: f.path })), + } + }) + }, [workflowState.data]) + + const toggleDep = (name: string) => { + setDeps((prev) => (prev.includes(name) ? prev.filter((d) => d !== name) : [...prev, name])) + } + + const parsedBatchSize = Number.parseInt(batchSizeInput, 10) + const batchSizeValid = + Number.isFinite(parsedBatchSize) && + parsedBatchSize >= BATCH_SIZE_MIN && + parsedBatchSize <= BATCH_SIZE_MAX + const previousBatchSize = + workflowColumnBatchSize ?? TABLE_LIMITS.WORKFLOW_COLUMN_BATCH_SIZE + const batchSizeChanged = batchSizeValid && parsedBatchSize !== previousBatchSize + + const handleSave = () => { + if (!configState || !selectedWorkflowId) return + if (!batchSizeValid) { + toast.error(`Run concurrency must be between ${BATCH_SIZE_MIN} and ${BATCH_SIZE_MAX}`) + return + } + const workflowConfig = { + workflowId: selectedWorkflowId, + dependencies: deps, + outputPath: outputPath === FULL_OUTPUT ? undefined : outputPath, + } + + if (batchSizeChanged) { + updateMetadata.mutate({ workflowColumnBatchSize: parsedBatchSize }) + } + + if (configState.mode === 'new') { + updateColumn.mutate( + { + columnName: configState.columnName, + updates: { + ...(configState.proposedName !== configState.columnName + ? { name: configState.proposedName } + : {}), + type: 'workflow', + workflowConfig, + }, + }, + { + onSuccess: () => { + toast.success(`Saved "${configState.proposedName}"`) + onClose() + }, + onError: (err) => toast.error(err.message || 'Failed to save'), + } + ) + return + } + + updateColumn.mutate( + { + columnName: configState.columnName, + updates: { workflowConfig }, + }, + { + onSuccess: () => { + toast.success(`Saved "${configState.columnName}"`) + onClose() + }, + onError: (err) => toast.error(err.message || 'Failed to save'), + } + ) + } + + return ( + + ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx index 213fe3d4199..9e0822864fb 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx @@ -5,8 +5,10 @@ import { useMemo } from 'react' import { RepeatIcon, SplitIcon } from 'lucide-react' import { useShallow } from 'zustand/react/shallow' import { Combobox, type ComboboxOptionGroup } from '@/components/emcn' -import { getEffectiveBlockOutputs } from '@/lib/workflows/blocks/block-outputs' -import { hasTriggerCapability } from '@/lib/workflows/triggers/trigger-utils' +import { + flattenWorkflowOutputs, + type FlattenOutputsBlockInput, +} from '@/lib/workflows/blocks/flatten-outputs' import { getBlock } from '@/blocks' import { useWorkflowDiffStore } from '@/stores/workflow-diff/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' @@ -38,8 +40,6 @@ const TagIcon: React.FC<{
) -const EXCLUDED_OUTPUT_TYPES = new Set(['starter', 'start_trigger', 'human_in_the_loop'] as const) - /** * Props for the OutputSelect component */ @@ -106,86 +106,51 @@ export function OutputSelect({ * Extracts all available workflow outputs for the dropdown */ const workflowOutputs = useMemo(() => { - const outputs: Array<{ - id: string - label: string - blockId: string - blockName: string - blockType: string - path: string - }> = [] - if (!workflowId || !workflowBlocks || typeof workflowBlocks !== 'object') { - return outputs + return [] } + const blockArray = Object.values(workflowBlocks) as any[] + if (blockArray.length === 0) return [] - const blockArray = Object.values(workflowBlocks) - if (blockArray.length === 0) return outputs - - blockArray.forEach((block: any) => { - if (EXCLUDED_OUTPUT_TYPES.has(block.type) || !block?.id || !block?.type) return - - const blockName = - block.name && typeof block.name === 'string' - ? block.name.replace(/\s+/g, '').toLowerCase() - : `block-${block.id}` - - const blockConfig = getBlock(block.type) - const isTriggerCapable = blockConfig ? hasTriggerCapability(blockConfig) : false - const effectiveTriggerMode = Boolean(block.triggerMode && isTriggerCapable) - - let outputsToProcess: Record = {} + // Merge the editor's subblock store values into the blocks before flattening — + // the workflow store doesn't always have the latest subBlocks.value. + const mergedBlocks: FlattenOutputsBlockInput[] = blockArray.map((block) => { const rawSubBlockValues = shouldUseBaseline && baselineWorkflow ? baselineWorkflow.blocks?.[block.id]?.subBlocks : subBlockValues?.[block.id] - const subBlocks: Record = {} + const subBlocks: Record = {} if (rawSubBlockValues && typeof rawSubBlockValues === 'object') { for (const [key, val] of Object.entries(rawSubBlockValues)) { - // Handle both { value: ... } and raw value formats - subBlocks[key] = val && typeof val === 'object' && 'value' in val ? val : { value: val } + subBlocks[key] = val && typeof val === 'object' && 'value' in (val as object) + ? (val as { value: unknown }) + : { value: val } } } - - outputsToProcess = getEffectiveBlockOutputs(block.type, subBlocks, { - triggerMode: effectiveTriggerMode, - preferToolOutputs: !effectiveTriggerMode, - }) as Record - - if (Object.keys(outputsToProcess).length === 0) return - - const addOutput = (path: string, outputObj: unknown, prefix = '') => { - const fullPath = prefix ? `${prefix}.${path}` : path - const createOutput = () => ({ - id: `${block.id}_${fullPath}`, - label: `${blockName}.${fullPath}`, - blockId: block.id, - blockName: block.name || `Block ${block.id}`, - blockType: block.type, - path: fullPath, - }) - - if ( - typeof outputObj !== 'object' || - outputObj === null || - ('type' in outputObj && typeof outputObj.type === 'string') || - Array.isArray(outputObj) - ) { - outputs.push(createOutput()) - return - } - - Object.entries(outputObj).forEach(([key, value]) => { - addOutput(key, value, fullPath) - }) + return { + id: block.id, + type: block.type, + name: block.name, + triggerMode: Boolean(block.triggerMode), + subBlocks, } - - Object.entries(outputsToProcess).forEach(([key, value]) => { - addOutput(key, value) - }) }) - return outputs + const flat = flattenWorkflowOutputs(mergedBlocks) + return flat.map((f) => { + const displayBlockName = + f.blockName && typeof f.blockName === 'string' + ? f.blockName.replace(/\s+/g, '').toLowerCase() + : `block-${f.blockId}` + return { + id: `${f.blockId}_${f.path}`, + label: `${displayBlockName}.${f.path}`, + blockId: f.blockId, + blockName: f.blockName, + blockType: f.blockType, + path: f.path, + } + }) }, [ workflowBlocks, workflowId, diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index 410e6e448c0..8965e416423 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -645,7 +645,7 @@ interface UpdateColumnParams { type?: string required?: boolean unique?: boolean - workflowConfig?: { workflowId: string } + workflowConfig?: { workflowId: string; dependencies?: string[]; outputPath?: string } } } @@ -723,6 +723,39 @@ export function useUpdateTableMetadata({ workspaceId, tableId }: RowMutationCont }) } +interface CancelRunsParams { + scope: 'all' | 'row' + rowId?: string +} + +/** + * Cancel in-flight and queued workflow-column runs for a table. + * Scope is either `all` (table-wide) or `row` (a single row). + */ +export function useCancelTableRuns({ workspaceId, tableId }: RowMutationContext) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: async ({ scope, rowId }: CancelRunsParams) => { + const res = await fetch(`/api/table/${tableId}/cancel-runs`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ workspaceId, scope, rowId }), + }) + if (!res.ok) { + const data = await res.json().catch(() => ({})) + throw new Error( + (data as { error?: string }).error || 'Failed to cancel runs' + ) + } + return res.json() as Promise<{ success: true; data: { cancelled: number } }> + }, + onSettled: () => { + queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + }, + }) +} + /** * Delete a column from a table. */ diff --git a/apps/sim/lib/table/constants.ts b/apps/sim/lib/table/constants.ts index afa06ac484f..a40c81047d3 100644 --- a/apps/sim/lib/table/constants.ts +++ b/apps/sim/lib/table/constants.ts @@ -21,6 +21,8 @@ export const TABLE_LIMITS = { MAX_BATCH_INSERT_SIZE: 1000, /** Maximum rows per bulk update/delete operation */ MAX_BULK_OPERATION_SIZE: 1000, + /** Default concurrency for async workflow-column executions kicked off by the scheduler */ + WORKFLOW_COLUMN_BATCH_SIZE: 10, } as const /** diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index ea48d204a52..1c25765d080 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -441,11 +441,11 @@ export async function renameTable( } /** - * Updates a table's UI metadata (e.g. column widths). - * Does not update `updatedAt` since metadata is UI-only state. + * Updates a table's metadata (UI state like column widths/order, plus behavioral + * settings like `workflowColumnBatchSize`). Merges into the existing metadata blob. * * @param tableId - Table ID to update - * @param metadata - New metadata object (merged with existing) + * @param metadata - Partial metadata object (merged with existing) * @param existingMetadata - Existing metadata from a prior fetch (avoids redundant DB read) * @returns Updated metadata */ @@ -1612,9 +1612,17 @@ export async function renameColumn( } const actualOldName = schema.columns[columnIndex].name - const updatedColumns = schema.columns.map((c, i) => - i === columnIndex ? { ...c, name: data.newName } : c - ) + const updatedColumns = schema.columns.map((c, i) => { + const base = i === columnIndex ? { ...c, name: data.newName } : { ...c } + // Cascade rename into sibling workflow columns' explicit dependency lists. + if (base.workflowConfig?.dependencies?.length) { + const rewritten = base.workflowConfig.dependencies.map((d) => + d === actualOldName ? data.newName : d + ) + base.workflowConfig = { ...base.workflowConfig, dependencies: rewritten } + } + return base + }) const updatedSchema: TableSchema = { columns: updatedColumns } const metadata = table.metadata as TableMetadata | null @@ -1675,7 +1683,15 @@ export async function deleteColumn( const actualName = schema.columns[columnIndex].name const updatedSchema: TableSchema = { - columns: schema.columns.filter((_, i) => i !== columnIndex), + columns: schema.columns + .filter((_, i) => i !== columnIndex) + .map((c) => { + // Cascade removal into sibling workflow columns' explicit dependency lists. + if (!c.workflowConfig?.dependencies?.length) return c + const filtered = c.workflowConfig.dependencies.filter((d) => d !== actualName) + if (filtered.length === c.workflowConfig.dependencies.length) return c + return { ...c, workflowConfig: { ...c.workflowConfig, dependencies: filtered } } + }), } const metadata = table.metadata as TableMetadata | null @@ -1738,7 +1754,15 @@ export async function deleteColumns( throw new Error('Cannot delete all columns from a table') } - const updatedSchema: TableSchema = { columns: remaining } + // Cascade removal into sibling workflow columns' explicit dependency lists. + const scrubbed = remaining.map((c) => { + if (!c.workflowConfig?.dependencies?.length) return c + const filtered = c.workflowConfig.dependencies.filter((d) => !namesToDelete.has(d)) + if (filtered.length === c.workflowConfig.dependencies.length) return c + return { ...c, workflowConfig: { ...c.workflowConfig, dependencies: filtered } } + }) + + const updatedSchema: TableSchema = { columns: scrubbed } const metadata = table.metadata as TableMetadata | null let updatedMetadata = metadata diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index 2236ff7509c..2a36ba4ab9f 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -23,6 +23,18 @@ export interface ColumnOption { export interface WorkflowColumnConfig { workflowId: string + /** + * Explicit dependency list (column names). When set, overrides the scheduler's + * default "all left non-workflow columns must be filled; upstream workflow + * columns must be completed" predicate — only the listed columns are checked. + */ + dependencies?: string[] + /** + * Dot-path into the workflow's final output to use as the cell's displayed value. + * Example: `summary` selects `output.summary`, `result.items[0]` selects the first + * item. When unset, the full output object is stored. + */ + outputPath?: string } export interface ColumnDefinition { @@ -45,10 +57,20 @@ export interface TableSchema { columns: ColumnDefinition[] } -/** UI-only metadata stored alongside the table definition. */ +/** + * Table-level metadata stored alongside the table definition. Holds both UI state + * (column widths, column order) and behavioral settings (e.g. scheduler concurrency) + * — treat it as first-class backend state, not UI-only. + */ export interface TableMetadata { columnWidths?: Record columnOrder?: string[] + /** + * Maximum number of workflow-column runs to execute concurrently per scheduler + * pass. Clamped to 1..100. When unset, the scheduler uses + * `TABLE_LIMITS.WORKFLOW_COLUMN_BATCH_SIZE`. + */ + workflowColumnBatchSize?: number } export interface TableDefinition { diff --git a/apps/sim/lib/table/workflow-columns.ts b/apps/sim/lib/table/workflow-columns.ts index 28bf2f0eb97..5ea9ad32978 100644 --- a/apps/sim/lib/table/workflow-columns.ts +++ b/apps/sim/lib/table/workflow-columns.ts @@ -18,6 +18,7 @@ import { webhook as webhookTable, workflow as workflowTable } from '@sim/db/sche import { createLogger } from '@sim/logger' import { and, eq, isNull } from 'drizzle-orm' import { generateId } from '@/lib/core/utils/uuid' +import { TABLE_LIMITS } from '@/lib/table/constants' import type { ColumnDefinition, RowData, @@ -28,6 +29,26 @@ import type { const logger = createLogger('WorkflowColumnScheduler') +/** + * Walk a dot-and-bracket path into a value (e.g. `a.b[0].c` or `result.items.0`). + * Returns undefined for any missing segment. Used by workflow columns that specify + * an `outputPath` to pick one field out of a workflow's full output. + */ +function pluckByPath(source: unknown, path: string): unknown { + if (source === null || source === undefined || !path) return source + const segments = path + .replace(/\[(\w+)\]/g, '.$1') + .split('.') + .filter(Boolean) + let cursor: unknown = source + for (const seg of segments) { + if (cursor === null || cursor === undefined) return undefined + if (typeof cursor !== 'object') return undefined + cursor = (cursor as Record)[seg] + } + return cursor +} + /** * Per-cell eligibility: returns true if the workflow should run for this row × column now. * @@ -47,28 +68,58 @@ export function isWorkflowColumnEligible( const status = cell?.status if (status === 'running' || status === 'completed' || status === 'error') return false - // Default predicate: every column to the left must be filled. - // For plain columns, "filled" means a non-null / non-empty value. - // For upstream workflow columns, "filled" means the cell has status === 'completed' - // — this is what makes cascading work: a downstream workflow only runs after the - // upstream workflow finishes. + const isFilled = (colToCheck: ColumnDefinition, value: unknown): boolean => { + if (colToCheck.type === 'workflow') { + const cellVal = value as WorkflowCellValue | null | undefined + return cellVal?.status === 'completed' + } + return value !== null && value !== undefined && value !== '' + } + + const explicitDeps = column.workflowConfig?.dependencies + if (explicitDeps && explicitDeps.length > 0) { + // Explicit dependency list: check only the named columns. Fail fast on unknown + // names — malformed config is a bug, not a silent skip. + for (const depName of explicitDeps) { + const depCol = schema.columns.find((c) => c.name === depName) + if (!depCol) { + throw new Error( + `Workflow column "${column.name}" has unknown dependency "${depName}"` + ) + } + if (!isFilled(depCol, row.data[depName])) return false + } + return true + } + + // Default predicate: every column to the left must be filled. Plain columns need a + // non-null/non-empty value; upstream workflow columns need status === 'completed' + // (this is what makes cascading work). for (let i = 0; i < columnIndex; i++) { const leftCol = schema.columns[i] - const value = row.data[leftCol.name] - if (leftCol.type === 'workflow') { - const leftCell = value as WorkflowCellValue | null | undefined - if (leftCell?.status !== 'completed') return false - continue - } - if (value === null || value === undefined || value === '') return false + if (!isFilled(leftCol, row.data[leftCol.name])) return false } return true } +/** Upper bound on workflow-column run concurrency exposed to the user. */ +const WORKFLOW_COLUMN_BATCH_SIZE_MAX = 100 + +interface ScheduleWorkflowColumnRunsOptions { + /** + * Maximum number of workflow-column runs to execute concurrently. When unset, the + * per-table value at `table.metadata.workflowColumnBatchSize` is used, falling back + * to `TABLE_LIMITS.WORKFLOW_COLUMN_BATCH_SIZE`. Clamped to 1..100. + */ + batchSize?: number +} + /** - * Fire-and-forget scheduler. Iterates workflow columns × rows and kicks off eligible - * executions. Safe to call after any row-write operation; errors are logged. + * Scheduler. Iterates workflow columns × rows and kicks off eligible executions in + * bounded-concurrency batches. Safe to call after any row-write operation; errors are + * logged. Callers typically invoke this with `void` — the function awaits internally to + * limit concurrency, but resolves once all scheduled runs complete. * * Actor identity for the downstream workflow is derived from the workflow record itself * (same convention as webhook/polling-fired triggers), so the service call site doesn't @@ -76,10 +127,12 @@ export function isWorkflowColumnEligible( * * @param table - The table definition with schema. * @param rows - Rows that were just written (post-commit state). + * @param options - Optional batching/concurrency controls. */ export async function scheduleWorkflowColumnRuns( table: TableDefinition, - rows: TableRow[] + rows: TableRow[], + options?: ScheduleWorkflowColumnRunsOptions ): Promise { try { const workflowColumns = table.schema.columns @@ -89,33 +142,100 @@ export async function scheduleWorkflowColumnRuns( if (workflowColumns.length === 0) return if (rows.length === 0) return - for (const row of rows) { - for (const { col, idx } of workflowColumns) { - if (!isWorkflowColumnEligible(col, idx, row, table.schema)) continue + const requestedBatchSize = + options?.batchSize ?? + table.metadata?.workflowColumnBatchSize ?? + TABLE_LIMITS.WORKFLOW_COLUMN_BATCH_SIZE + const batchSize = Math.min( + WORKFLOW_COLUMN_BATCH_SIZE_MAX, + Math.max(1, Math.floor(requestedBatchSize)) + ) - const workflowId = col.workflowConfig!.workflowId - const executionId = generateId() + const pendingRuns: RunWorkflowColumnOptions[] = [] - logger.info( - `Scheduling workflow column run: table=${table.id} row=${row.id} col=${col.name} workflow=${workflowId}` - ) + for (const row of rows) { + for (const { col, idx } of workflowColumns) { + let eligible = false + try { + eligible = isWorkflowColumnEligible(col, idx, row, table.schema) + } catch (predicateErr) { + // Malformed dependency config — surface it on the specific cell so the user + // can see why their column is stuck, then move on to the next row/column. + const message = predicateErr instanceof Error ? predicateErr.message : String(predicateErr) + logger.error( + `Eligibility predicate threw for table=${table.id} row=${row.id} col=${col.name}: ${message}` + ) + void markCellError(table.id, row.id, col.name, col.workflowConfig!.workflowId, message) + continue + } + if (!eligible) continue - void runWorkflowColumn({ + pendingRuns.push({ tableId: table.id, tableName: table.name, rowId: row.id, columnName: col.name, - workflowId, + workflowId: col.workflowConfig!.workflowId, workspaceId: table.workspaceId, - executionId, + executionId: generateId(), }) } } + + if (pendingRuns.length === 0) return + + logger.info( + `Scheduling ${pendingRuns.length} workflow column run(s) for table=${table.id} in batches of ${batchSize}` + ) + + for (let i = 0; i < pendingRuns.length; i += batchSize) { + const batch = pendingRuns.slice(i, i + batchSize) + await Promise.allSettled(batch.map((opts) => runWorkflowColumn(opts))) + } } catch (err) { logger.error('scheduleWorkflowColumnRuns failed:', err) } } +/** + * Write a config-error cell directly via the service layer. Used when the eligibility + * predicate throws (e.g. dependency refers to a nonexistent column). + */ +async function markCellError( + tableId: string, + rowId: string, + columnName: string, + workflowId: string, + message: string +): Promise { + try { + const { getTableById, getRowById, updateRow } = await import('@/lib/table/service') + const table = await getTableById(tableId) + if (!table) return + const row = await getRowById(tableId, rowId, table.workspaceId) + if (!row) return + const errorCell: WorkflowCellValue = { + executionId: null, + workflowId, + status: 'error', + output: null, + error: message, + } + await updateRow( + { + tableId, + rowId, + data: { ...row.data, [columnName]: errorCell as unknown as RowData[string] }, + workspaceId: table.workspaceId, + }, + table, + `wfcol-config-error-${rowId}-${columnName}` + ) + } catch (err) { + logger.error('markCellError failed:', err) + } +} + interface RunWorkflowColumnOptions { tableId: string tableName: string @@ -242,6 +362,9 @@ export async function runWorkflowColumn(opts: RunWorkflowColumnOptions): Promise return } + const columnDef = table.schema.columns.find((c) => c.name === columnName) + const outputPath = columnDef?.workflowConfig?.outputPath + const inputRow: Record = {} for (const key of Object.keys(row.data)) { if (key === columnName) continue @@ -286,11 +409,13 @@ export async function runWorkflowColumn(opts: RunWorkflowColumnOptions): Promise ) if (result.success) { + const rawOutput = (result.output as unknown) ?? null + const pickedOutput = outputPath ? pluckByPath(rawOutput, outputPath) : rawOutput await writeCell({ executionId, workflowId, status: 'completed', - output: (result.output as unknown) ?? null, + output: pickedOutput ?? null, error: null, }) } else { diff --git a/apps/sim/lib/workflows/blocks/flatten-outputs.ts b/apps/sim/lib/workflows/blocks/flatten-outputs.ts new file mode 100644 index 00000000000..8dfd50f9641 --- /dev/null +++ b/apps/sim/lib/workflows/blocks/flatten-outputs.ts @@ -0,0 +1,140 @@ +/** + * Flatten workflow block outputs into pickable paths. + * + * Shared helper used by any UI that needs to let a user pick one (or many) of a + * workflow's block outputs — deploy modal's OutputSelect, the table workflow-column + * config sidebar, etc. Keeping this in one place means the "what counts as an + * output" rules (excluded types, trigger-mode handling, recursion into nested + * output shapes, BFS sort order) don't drift between consumers. + */ + +import { getEffectiveBlockOutputs } from '@/lib/workflows/blocks/block-outputs' + +/** + * Block types whose "outputs" are really workflow inputs (Start/starter) or flow + * control and should never appear in an output picker. + */ +export const EXCLUDED_OUTPUT_TYPES = new Set(['starter', 'start_trigger', 'human_in_the_loop']) + +export interface FlattenedBlockOutput { + blockId: string + blockName: string + blockType: string + /** Dot-path into the block's output (e.g. `content`, `content.text`). */ + path: string +} + +/** + * Minimal shape we need off each block — compatible with both the normalized + * WorkflowState.blocks entries and the editor's live block state. + */ +export interface FlattenOutputsBlockInput { + id: string + type: string + name?: string + triggerMode?: boolean + subBlocks?: Record +} + +export interface FlattenOutputsEdgeInput { + source: string + target: string +} + +/** + * Compute a flat list of pickable output paths across every eligible block in + * a workflow. + * + * @param blocks Iterable of blocks from the workflow state. + * @param edges Optional edge list — when provided, results are sorted by BFS + * distance from a start/trigger block (descending), so terminal blocks + * appear first. This matches the deploy modal's grouping order. + */ +export function flattenWorkflowOutputs( + blocks: Iterable, + edges: Iterable = [] +): FlattenedBlockOutput[] { + const blockList = Array.from(blocks) + const results: FlattenedBlockOutput[] = [] + + for (const block of blockList) { + if (!block?.id || !block?.type) continue + if (EXCLUDED_OUTPUT_TYPES.has(block.type)) continue + + const normalizedSubBlocks: Record = {} + if (block.subBlocks && typeof block.subBlocks === 'object') { + for (const [k, v] of Object.entries(block.subBlocks)) { + normalizedSubBlocks[k] = + v && typeof v === 'object' && 'value' in (v as object) + ? (v as { value: unknown }) + : { value: v } + } + } + + const effectiveTriggerMode = Boolean(block.triggerMode) + let outs: Record = {} + try { + outs = getEffectiveBlockOutputs(block.type, normalizedSubBlocks, { + triggerMode: effectiveTriggerMode, + preferToolOutputs: !effectiveTriggerMode, + }) as Record + } catch { + continue + } + if (!outs || Object.keys(outs).length === 0) continue + + const blockName = block.name || `Block ${block.id}` + const add = (path: string, outputObj: unknown, prefix = ''): void => { + const fullPath = prefix ? `${prefix}.${path}` : path + const isLeaf = + typeof outputObj !== 'object' || + outputObj === null || + ('type' in (outputObj as object) && + typeof (outputObj as { type: unknown }).type === 'string') || + Array.isArray(outputObj) + if (isLeaf) { + results.push({ blockId: block.id, blockName, blockType: block.type, path: fullPath }) + return + } + for (const [key, value] of Object.entries(outputObj as Record)) { + add(key, value, fullPath) + } + } + + for (const [key, value] of Object.entries(outs)) add(key, value) + } + + const edgeList = Array.from(edges) + if (edgeList.length === 0 || results.length === 0) return results + + // Sort by BFS distance from the first start/trigger block, descending — so terminal + // blocks group to the top. Matches the deploy modal's OutputSelect ordering. + const startBlock = blockList.find( + (b) => b.type === 'starter' || b.type === 'start_trigger' || !!b.triggerMode + ) + if (!startBlock) return results + + const adj: Record = {} + for (const e of edgeList) { + if (!adj[e.source]) adj[e.source] = [] + adj[e.source].push(e.target) + } + const distances: Record = {} + const visited = new Set() + const queue: Array<[string, number]> = [[startBlock.id, 0]] + while (queue.length > 0) { + const [id, d] = queue.shift()! + if (visited.has(id)) continue + visited.add(id) + distances[id] = d + for (const t of adj[id] ?? []) queue.push([t, d + 1]) + } + + return results + .map((r, i) => ({ r, d: distances[r.blockId] ?? -1, i })) + .sort((a, b) => { + if (b.d !== a.d) return b.d - a.d + return a.i - b.i // preserve discovery order within the same distance + }) + .map(({ r }) => r) +} From 233ca49bc3eb6f5a6ff39e08c74c1a7d9faaa1c0 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 27 Apr 2026 19:36:24 -0700 Subject: [PATCH 04/32] Use trigger dev for workflow runs --- .../api/table/[tableId]/cancel-runs/route.ts | 21 +- .../rows/[rowId]/run-workflow-column/route.ts | 2 +- .../app/api/table/[tableId]/triggers/route.ts | 90 -- apps/sim/app/api/table/utils.ts | 7 + .../resource-header/resource-header.tsx | 34 +- .../components/context-menu/context-menu.tsx | 19 +- .../[tableId]/components/table/table.tsx | 1021 ++++++++++++----- .../workflow-column-sidebar.tsx | 304 ++--- .../[tableId]/hooks/use-row-execution.ts | 67 +- .../output-select/output-select.tsx | 9 +- .../background/workflow-column-execution.ts | 248 ++++ apps/sim/hooks/queries/tables.ts | 141 ++- .../lib/core/async-jobs/backends/database.ts | 22 + .../core/async-jobs/backends/trigger-dev.ts | 18 + apps/sim/lib/core/async-jobs/inline-abort.ts | 35 + apps/sim/lib/core/async-jobs/types.ts | 8 + apps/sim/lib/table/service.ts | 122 +- apps/sim/lib/table/trigger.ts | 11 +- apps/sim/lib/table/types.ts | 8 +- apps/sim/lib/table/workflow-columns.ts | 326 +++--- apps/sim/triggers/table/poller.ts | 6 +- 21 files changed, 1716 insertions(+), 803 deletions(-) delete mode 100644 apps/sim/app/api/table/[tableId]/triggers/route.ts create mode 100644 apps/sim/background/workflow-column-execution.ts create mode 100644 apps/sim/lib/core/async-jobs/inline-abort.ts diff --git a/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts b/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts index 5e5369c7cb3..9141ca20b9e 100644 --- a/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts +++ b/apps/sim/app/api/table/[tableId]/cancel-runs/route.ts @@ -3,6 +3,8 @@ import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { cancelWorkflowColumnRuns } from '@/lib/table/workflow-columns' import { accessError, checkAccess } from '@/app/api/table/utils' const logger = createLogger('TableCancelRunsAPI') @@ -30,7 +32,7 @@ interface RouteParams { * Real cancellation requires a Redis pubsub signal plus a DB-backed "cancel requested" * flag so multi-replica deploys can abort AbortControllers held on any instance. */ -export async function POST(request: NextRequest, { params }: RouteParams) { +export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => { const requestId = generateRequestId() const { tableId } = await params @@ -44,10 +46,7 @@ export async function POST(request: NextRequest, { params }: RouteParams) { const validated = CancelRunsSchema.parse(body) if (validated.scope === 'row' && !validated.rowId) { - return NextResponse.json( - { error: 'rowId is required when scope is "row"' }, - { status: 400 } - ) + return NextResponse.json({ error: 'rowId is required when scope is "row"' }, { status: 400 }) } const result = await checkAccess(tableId, authResult.userId, 'write') @@ -58,13 +57,17 @@ export async function POST(request: NextRequest, { params }: RouteParams) { return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) } + const cancelled = await cancelWorkflowColumnRuns( + tableId, + validated.scope === 'row' ? validated.rowId : undefined + ) logger.info( - `[${requestId}] cancel-runs requested: tableId=${tableId} scope=${validated.scope}${ + `[${requestId}] cancel-runs: tableId=${tableId} scope=${validated.scope}${ validated.rowId ? ` rowId=${validated.rowId}` : '' - } (stub — Redis pubsub not yet wired)` + } cancelled=${cancelled}` ) - return NextResponse.json({ success: true, data: { cancelled: 0 } }) + return NextResponse.json({ success: true, data: { cancelled } }) } catch (error) { if (error instanceof z.ZodError) { return NextResponse.json( @@ -75,4 +78,4 @@ export async function POST(request: NextRequest, { params }: RouteParams) { logger.error(`[${requestId}] cancel-runs failed for ${tableId}:`, error) return NextResponse.json({ error: 'Failed to cancel runs' }, { status: 500 }) } -} +}) diff --git a/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts b/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts index bf54434b448..12dfdc04780 100644 --- a/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts +++ b/apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts @@ -1,9 +1,9 @@ import { createLogger } from '@sim/logger' +import { generateId } from '@sim/utils/id' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' -import { generateId } from '@sim/utils/id' import { getRowById } from '@/lib/table' import { runWorkflowColumn } from '@/lib/table/workflow-columns' import { accessError, checkAccess } from '@/app/api/table/utils' diff --git a/apps/sim/app/api/table/[tableId]/triggers/route.ts b/apps/sim/app/api/table/[tableId]/triggers/route.ts deleted file mode 100644 index 881745b3cec..00000000000 --- a/apps/sim/app/api/table/[tableId]/triggers/route.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { db } from '@sim/db' -import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull, or } from 'drizzle-orm' -import { type NextRequest, NextResponse } from 'next/server' -import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' -import { generateRequestId } from '@/lib/core/utils/request' -import { accessError, checkAccess } from '@/app/api/table/utils' - -const logger = createLogger('TableTriggersAPI') - -interface RouteParams { - params: Promise<{ tableId: string }> -} - -/** - * GET /api/table/[tableId]/triggers - * Returns deployed workflows with manual table triggers for this table. - */ -export async function GET(request: NextRequest, { params }: RouteParams) { - const requestId = generateRequestId() - const { tableId } = await params - - try { - const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) - if (!authResult.success || !authResult.userId) { - logger.warn(`[${requestId}] Unauthorized table triggers access attempt`) - return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) - } - - const result = await checkAccess(tableId, authResult.userId, 'read') - if (!result.ok) return accessError(result, requestId, tableId) - - const rows = await db - .select({ - webhookId: webhook.id, - workflowId: workflow.id, - workflowName: workflow.name, - workflowColor: workflow.color, - providerConfig: webhook.providerConfig, - }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .leftJoin( - workflowDeploymentVersion, - and( - eq(workflowDeploymentVersion.workflowId, workflow.id), - eq(workflowDeploymentVersion.isActive, true) - ) - ) - .where( - and( - eq(webhook.provider, 'table'), - eq(webhook.isActive, true), - isNull(webhook.archivedAt), - eq(workflow.isDeployed, true), - isNull(workflow.archivedAt), - or( - eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), - and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) - ) - ) - ) - - interface ProviderConfig { - tableId?: string - tableSelector?: string - manualTableId?: string - eventType?: string - } - - const manualTriggers = rows.filter((row) => { - const config = row.providerConfig as ProviderConfig | null - const configTableId = config?.tableId ?? config?.tableSelector ?? config?.manualTableId - if (configTableId !== tableId) return false - return config?.eventType === 'manual' - }) - - const workflows = manualTriggers.map((row) => ({ - workflowId: row.workflowId, - workflowName: row.workflowName, - workflowColor: row.workflowColor, - })) - - return NextResponse.json({ success: true, data: { workflows } }) - } catch (error) { - logger.error(`[${requestId}] Error fetching table triggers:`, error) - return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) - } -} diff --git a/apps/sim/app/api/table/utils.ts b/apps/sim/app/api/table/utils.ts index d7bc286da25..185acc946b0 100644 --- a/apps/sim/app/api/table/utils.ts +++ b/apps/sim/app/api/table/utils.ts @@ -167,6 +167,13 @@ export const CreateColumnSchema = z.object({ required: z.boolean().optional(), unique: z.boolean().optional(), position: z.number().int().min(0).optional(), + workflowConfig: z + .object({ + workflowId: z.string().min(1), + dependencies: z.array(z.string()).optional(), + outputPath: z.string().optional(), + }) + .optional(), }), }) diff --git a/apps/sim/app/workspace/[workspaceId]/components/resource/components/resource-header/resource-header.tsx b/apps/sim/app/workspace/[workspaceId]/components/resource/components/resource-header/resource-header.tsx index 68baec3f2b6..22686115782 100644 --- a/apps/sim/app/workspace/[workspaceId]/components/resource/components/resource-header/resource-header.tsx +++ b/apps/sim/app/workspace/[workspaceId]/components/resource/components/resource-header/resource-header.tsx @@ -55,6 +55,14 @@ interface ResourceHeaderProps { breadcrumbs?: BreadcrumbItem[] create?: CreateAction actions?: HeaderAction[] + /** Arbitrary content rendered in the right-aligned actions row, before the Create button. */ + trailingActions?: React.ReactNode + /** + * Replaces the default Create button entirely — supply your own trigger (for + * example a dropdown) when the create action needs richer UI. When provided, + * `create` is ignored. + */ + createTrigger?: React.ReactNode } export const ResourceHeader = memo(function ResourceHeader({ @@ -63,6 +71,8 @@ export const ResourceHeader = memo(function ResourceHeader({ breadcrumbs, create, actions, + trailingActions, + createTrigger, }: ResourceHeaderProps) { const hasBreadcrumbs = breadcrumbs && breadcrumbs.length > 0 @@ -124,17 +134,19 @@ export const ResourceHeader = memo(function ResourceHeader({ ) })} - {create && ( - - )} + {trailingActions} + {createTrigger ?? + (create && ( + + ))}
diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx index eb1e410cfde..227e513734b 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx @@ -9,7 +9,7 @@ import { DropdownMenuTrigger, } from '@/components/emcn' import { ArrowDown, ArrowUp, Duplicate, Pencil, Play, Trash } from '@/components/emcn/icons' -import type { ManualTriggerWorkflow } from '@/hooks/queries/tables' +import type { WorkflowMetadata } from '@/stores/workflows/registry/types' import type { ContextMenuState } from '../../types' interface ContextMenuProps { @@ -24,7 +24,7 @@ interface ContextMenuProps { disableEdit?: boolean disableInsert?: boolean disableDelete?: boolean - manualTriggerWorkflows?: ManualTriggerWorkflow[] + workflows?: WorkflowMetadata[] onRunWorkflow?: (workflowId: string) => void } @@ -40,11 +40,11 @@ export function ContextMenu({ disableEdit = false, disableInsert = false, disableDelete = false, - manualTriggerWorkflows, + workflows, onRunWorkflow, }: ContextMenuProps) { const deleteLabel = selectedRowCount > 1 ? `Delete ${selectedRowCount} rows` : 'Delete row' - const hasWorkflows = manualTriggerWorkflows && manualTriggerWorkflows.length > 0 + const hasWorkflows = workflows && workflows.length > 0 const hasRow = contextMenu.row !== null return ( @@ -100,16 +100,13 @@ export function ContextMenu({ Run Workflow - {manualTriggerWorkflows.map((wf) => ( - onRunWorkflow(wf.workflowId)} - > + {workflows.map((wf) => ( + onRunWorkflow(wf.id)}> - {wf.workflowName} + {wf.name} ))} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx index 3e493a5b02c..35a68d1a4f7 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table/table.tsx @@ -1,17 +1,17 @@ 'use client' -import React, { useCallback, useEffect, useMemo, useRef, useState } from 'react' +import React, { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from 'react' import { Square } from 'lucide-react' import { useParams, useRouter } from 'next/navigation' import { usePostHog } from 'posthog-js/react' import { - Badge, Button, Checkbox, DatePicker, DropdownMenu, DropdownMenuContent, DropdownMenuItem, + DropdownMenuSearchInput, DropdownMenuSeparator, DropdownMenuSub, DropdownMenuSubContent, @@ -22,7 +22,6 @@ import { ModalContent, ModalFooter, ModalHeader, - PillsRing, Skeleton, Upload, } from '@/components/emcn' @@ -69,13 +68,13 @@ import { useRenameTable, useUpdateColumn, useUpdateTableMetadata, - useManualTriggers, useUpdateTableRow, - type ManualTriggerWorkflow, } from '@/hooks/queries/tables' +import { useWorkflows } from '@/hooks/queries/workflows' import { useInlineRename } from '@/hooks/use-inline-rename' import { extractCreatedRowId, useTableUndo } from '@/hooks/use-table-undo' import type { DeletedRowSnapshot } from '@/stores/table/types' +import type { WorkflowMetadata } from '@/stores/workflows/registry/types' import { useContextMenu, useRowExecution, useTableData } from '../../hooks' import type { EditingCell, QueryOptions, SaveReason } from '../../types' import { @@ -211,6 +210,7 @@ export function Table({ const [deletingRows, setDeletingRows] = useState([]) const [editingCell, setEditingCell] = useState(null) const [initialCharacter, setInitialCharacter] = useState(null) + const [expandedCell, setExpandedCell] = useState(null) const [selectionAnchor, setSelectionAnchor] = useState(null) const [selectionFocus, setSelectionFocus] = useState(null) const [checkedRows, setCheckedRows] = useState(EMPTY_CHECKED_ROWS) @@ -259,9 +259,9 @@ export function Table({ } = useContextMenu() const { runWorkflowColumn } = useRowExecution() - const { data: manualTriggerWorkflows } = useManualTriggers(tableId) - const manualTriggerWorkflowsRef = useRef(manualTriggerWorkflows) - manualTriggerWorkflowsRef.current = manualTriggerWorkflows + const { data: workflows } = useWorkflows(workspaceId) + const workflowsRef = useRef(workflows) + workflowsRef.current = workflows const updateRowMutation = useUpdateTableRow({ workspaceId, tableId }) const createRowMutation = useCreateTableRow({ workspaceId, tableId }) @@ -654,7 +654,7 @@ export function Table({ (workflowId: string, columnName?: string) => { const row = contextMenu.row if (!row) return - const wf = manualTriggerWorkflows?.find((w) => w.workflowId === workflowId) + const wf = workflows?.find((w) => w.id === workflowId) const targetColumn = columnName ?? columns.find((c) => c.type === 'workflow' && c.workflowConfig?.workflowId === workflowId) @@ -665,11 +665,11 @@ export function Table({ rowId: row.id, workspaceId, columnName: targetColumn, - workflowName: wf?.workflowName, + workflowName: wf?.name, }) closeContextMenu() }, - [contextMenu.row, manualTriggerWorkflows, columns, tableId, workspaceId, runWorkflowColumn, closeContextMenu] + [contextMenu.row, workflows, columns, tableId, workspaceId, runWorkflowColumn, closeContextMenu] ) const handleCellMouseDown = useCallback( @@ -992,15 +992,50 @@ export function Table({ setInitialCharacter(null) }, []) + // Double-click highlights the cell's text and, only if the text is actually + // truncated, opens the expanded popover. The cell has `select-none` which + // suppresses the highlight even for programmatic selections, so we override + // `user-select` on the inner element until the next click. Workflow cells nest + // their text inside a span with its own `overflow-clip`, so we measure the leaf + // element's scroll dimensions, not just the wrapper div's. const handleCellDoubleClick = useCallback((rowId: string, columnName: string) => { - if (!canEditRef.current) return - const column = columnsRef.current.find((c) => c.name === columnName) - if (!column || column.type === 'boolean' || column.type === 'workflow') return - setSelectionFocus(null) setIsColumnSelection(false) - setEditingCell({ rowId, columnName }) - setInitialCharacter(null) + + const row = rowsRef.current.find((r) => r.id === rowId) + const colIndex = columnsRef.current.findIndex((c) => c.name === columnName) + let overflows = true + if (row && colIndex !== -1) { + const td = document.querySelector( + `[data-table-scroll] [data-row="${row.position}"][data-col="${colIndex}"]` + ) + const inner = td?.querySelector(':scope > div:last-child') + if (inner) { + const candidates: HTMLElement[] = [inner] + const descendants = inner.querySelectorAll('*') + for (const el of descendants) candidates.push(el) + overflows = candidates.some( + (el) => el.scrollWidth > el.clientWidth + 1 || el.scrollHeight > el.clientHeight + 1 + ) + + inner.style.userSelect = 'text' + const clear = () => { + inner.style.userSelect = '' + window.removeEventListener('mousedown', clear, true) + } + window.addEventListener('mousedown', clear, true) + + const selection = window.getSelection() + if (selection) { + const range = document.createRange() + range.selectNodeContents(inner) + selection.removeAllRanges() + selection.addRange(range) + } + } + } + + if (overflows) setExpandedCell({ rowId, columnName }) }, []) const mutateRef = useRef(updateRowMutation.mutate) @@ -1695,18 +1730,30 @@ export function Table({ return name }, []) - const handleAddColumn = useCallback(() => { - const name = generateColumnName() - const position = schemaColumnsRef.current.length - addColumnMutation.mutate( - { name, type: 'string' }, - { - onSuccess: () => { - pushUndoRef.current({ type: 'create-column', columnName: name, position }) - }, + const handleAddColumn = useCallback( + (type = 'string', workflowId?: string) => { + const name = generateColumnName() + // Workflow columns: don't persist anything yet. The sidebar's Save creates the + // column atomically via POST with the chosen workflowConfig — opens instantly + // and avoids stranding a placeholder column if the user cancels. + if (type === 'workflow' && workflowId) { + const wf = workflowsRef.current?.find((w) => w.id === workflowId) + const proposedName = wf ? sanitizeWorkflowNameAsColumnRef.current(wf.name, name) : name + setConfigState({ mode: 'create', columnName: name, workflowId, proposedName }) + return } - ) - }, [generateColumnName]) + const position = schemaColumnsRef.current.length + addColumnMutation.mutate( + { name, type }, + { + onSuccess: () => { + pushUndoRef.current({ type: 'create-column', columnName: name, position }) + }, + } + ) + }, + [generateColumnName] + ) const handleChangeType = useCallback((columnName: string, newType: string) => { const column = columnsRef.current.find((c) => c.name === columnName) @@ -1819,16 +1866,21 @@ export function Table({ }, [] ) + const sanitizeWorkflowNameAsColumnRef = useRef(sanitizeWorkflowNameAsColumn) + sanitizeWorkflowNameAsColumnRef.current = sanitizeWorkflowNameAsColumn /** * Config state for the side panel: * - `null` → closed. * - `{ mode: 'edit' }` → configuring an existing workflow column. - * - `{ mode: 'new' }` → user just picked a workflow; nothing is persisted until Save. + * - `{ mode: 'new' }` → user changed an existing column to workflow; not persisted until Save. + * - `{ mode: 'create' }` → user picked a workflow from "Add column"; column doesn't exist yet, + * created on Save in a single POST. */ type ConfigState = | { mode: 'edit'; columnName: string } | { mode: 'new'; columnName: string; workflowId: string; proposedName: string } + | { mode: 'create'; columnName: string; workflowId: string; proposedName: string } | null const [configState, setConfigState] = useState(null) @@ -1840,10 +1892,8 @@ export function Table({ (columnName: string, workflowId: string) => { // Don't persist anything yet — open the sidebar with the pending workflow pick. // The Save button in the sidebar is what actually writes type/name/config. - const wf = manualTriggerWorkflowsRef.current?.find((w) => w.workflowId === workflowId) - const proposedName = wf - ? sanitizeWorkflowNameAsColumn(wf.workflowName, columnName) - : columnName + const wf = workflowsRef.current?.find((w) => w.id === workflowId) + const proposedName = wf ? sanitizeWorkflowNameAsColumn(wf.name, columnName) : columnName setConfigState({ mode: 'new', columnName, workflowId, proposedName }) }, [sanitizeWorkflowNameAsColumn] @@ -2037,13 +2087,16 @@ export function Table({ ] ) - const createAction = useMemo( - () => ({ - label: 'New column', - onClick: handleAddColumn, - disabled: addColumnMutation.isPending, - }), - [handleAddColumn, addColumnMutation.isPending] + const createTrigger = useMemo( + () => + userPermissions.canEdit ? ( + + ) : null, + [handleAddColumn, addColumnMutation.isPending, workflows, userPermissions.canEdit] ) const headerActions = useMemo( @@ -2112,8 +2165,14 @@ export function Table({ [columns] ) const hasWorkflowColumns = workflowColumnNames.length > 0 - const workflowColumnNamesRef = useRef(workflowColumnNames) - workflowColumnNamesRef.current = workflowColumnNames + + const workflowNameById = useMemo(() => { + const map: Record = {} + for (const wf of workflows ?? []) { + map[wf.id] = wf.name + } + return map + }, [workflows]) const { runningByRowId, totalRunning } = useMemo(() => { const byRow = new Map() @@ -2149,13 +2208,12 @@ export function Table({ const handleRunRow = useCallback( (rowId: string) => { - const columnNames = workflowColumnNamesRef.current - if (columnNames.length === 0) return - for (const columnName of columnNames) { + if (workflowColumnNames.length === 0) return + for (const columnName of workflowColumnNames) { void runWorkflowColumn({ tableId, rowId, workspaceId, columnName }) } }, - [runWorkflowColumn, tableId, workspaceId] + [runWorkflowColumn, tableId, workspaceId, workflowColumnNames] ) if (!isLoadingTable && !tableData) { @@ -2179,8 +2237,17 @@ export function Table({ 0 ? ( + + ) : null + } /> - {totalRunning > 0 && ( - - )} -
-
- - {isLoadingTable ? ( - - - {Array.from({ length: SKELETON_COL_COUNT }).map((_, i) => ( - - ))} - - - ) : ( - - )} - +
+
+
{isLoadingTable ? ( - - + + {Array.from({ length: SKELETON_COL_COUNT }).map((_, i) => ( - + ))} + + + ) : ( + + )} + + {isLoadingTable ? ( + + + {Array.from({ length: SKELETON_COL_COUNT }).map((_, i) => ( + + ))} + - ))} - - - ) : ( - - - {displayColumns.map((column, idx) => ( - = normalizedSelection.startCol && - idx <= normalizedSelection.endCol - } - renameValue={ - columnRename.editingId === column.name ? columnRename.editValue : '' - } - onRenameValueChange={columnRename.setEditValue} - onRenameSubmit={columnRename.submitRename} - onRenameCancel={columnRename.cancelRename} - onRenameColumn={handleRenameColumn} - onColumnSelect={handleColumnSelect} - onChangeType={handleChangeType} - onInsertLeft={handleInsertColumnLeft} - onInsertRight={handleInsertColumnRight} - onToggleUnique={handleToggleUnique} - onDeleteColumn={handleDeleteColumn} - onResizeStart={handleColumnResizeStart} - onResize={handleColumnResize} - onResizeEnd={handleColumnResizeEnd} - onAutoResize={handleColumnAutoResize} - onDragStart={handleColumnDragStart} - onDragOver={handleColumnDragOver} - onDragEnd={handleColumnDragEnd} - onDragLeave={handleColumnDragLeave} - manualTriggerWorkflows={manualTriggerWorkflows} - onChangeToWorkflow={handleChangeToWorkflow} - onConfigureWorkflow={handleConfigureWorkflow} + + ) : ( + + - ))} - {userPermissions.canEdit && ( - - )} - - )} - - - {isLoadingTable || isLoadingRows ? ( - - ) : ( - <> - {rows.map((row, index) => { - const prevPosition = index > 0 ? rows[index - 1].position : -1 - const gapCount = queryOptions.filter ? 0 : row.position - prevPosition - 1 - return ( - - {gapCount > 0 && ( - ( + = normalizedSelection.startCol && + idx <= normalizedSelection.endCol + } + renameValue={ + columnRename.editingId === column.name ? columnRename.editValue : '' + } + onRenameValueChange={columnRename.setEditValue} + onRenameSubmit={columnRename.submitRename} + onRenameCancel={columnRename.cancelRename} + onRenameColumn={handleRenameColumn} + onColumnSelect={handleColumnSelect} + onChangeType={handleChangeType} + onInsertLeft={handleInsertColumnLeft} + onInsertRight={handleInsertColumnRight} + onToggleUnique={handleToggleUnique} + onDeleteColumn={handleDeleteColumn} + onResizeStart={handleColumnResizeStart} + onResize={handleColumnResize} + onResizeEnd={handleColumnResizeEnd} + onAutoResize={handleColumnAutoResize} + onDragStart={handleColumnDragStart} + onDragOver={handleColumnDragOver} + onDragEnd={handleColumnDragEnd} + onDragLeave={handleColumnDragLeave} + workflows={workflows} + onChangeToWorkflow={handleChangeToWorkflow} + onConfigureWorkflow={handleConfigureWorkflow} + /> + ))} + {userPermissions.canEdit && ( + + )} + + )} + + + {isLoadingTable || isLoadingRows ? ( + + ) : ( + <> + {rows.map((row, index) => { + const prevPosition = index > 0 ? rows[index - 1].position : -1 + const gapCount = queryOptions.filter ? 0 : row.position - prevPosition - 1 + return ( + + {gapCount > 0 && ( + + )} + - )} - - - ) - })} - - )} - -
-
- -
-
-
+
+
+ +
+
+
+ + +
+
+
- +
-
- - -
-
- {resizingColumn && ( -
- )} - {dropColumnBounds !== null && ( - <> -
+ + ) + })} + + )} + + + {resizingColumn && (
- + )} + {dropColumnBounds !== null && ( + <> +
+
+ + )} +
+ {!isLoadingTable && !isLoadingRows && userPermissions.canEdit && ( + )}
- {!isLoadingTable && !isLoadingRows && userPermissions.canEdit && ( - - )} -
- setConfigState(null)} - existingColumn={ - configState?.mode === 'edit' - ? columns.find((c) => c.name === configState.columnName) ?? null - : null - } - allColumns={columns} - workflows={manualTriggerWorkflows} - workspaceId={workspaceId} - tableId={tableId} - workflowColumnBatchSize={tableData?.metadata?.workflowColumnBatchSize} - /> + setConfigState(null)} + existingColumn={ + configState?.mode === 'edit' + ? (columns.find((c) => c.name === configState.columnName) ?? null) + : null + } + allColumns={columns} + workflows={workflows} + workspaceId={workspaceId} + tableId={tableId} + workflowColumnBatchSize={tableData?.metadata?.workflowColumnBatchSize} + />
{editingRow && tableData && ( @@ -2448,10 +2512,20 @@ export function Table({ disableEdit={!userPermissions.canEdit} disableInsert={!userPermissions.canEdit} disableDelete={!userPermissions.canEdit} - manualTriggerWorkflows={manualTriggerWorkflows} + workflows={workflows} onRunWorkflow={handleRunWorkflow} /> + setExpandedCell(null)} + rows={rows} + columns={columns} + onSave={handleInlineSave} + canEdit={userPermissions.canEdit} + scrollContainer={scrollRef.current} + /> + {!embedded && ( @@ -2735,6 +2809,8 @@ interface DataRowProps { hasWorkflowColumns: boolean onStopRow: (rowId: string) => void onRunRow: (rowId: string) => void + /** Lookup from workflow id → human-readable name, used to label running cells. */ + workflowNameById: Record } function rowSelectionChanged( @@ -2788,7 +2864,8 @@ function dataRowPropsAreEqual(prev: DataRowProps, next: DataRowProps): boolean { prev.runningCount !== next.runningCount || prev.hasWorkflowColumns !== next.hasWorkflowColumns || prev.onStopRow !== next.onStopRow || - prev.onRunRow !== next.onRunRow + prev.onRunRow !== next.onRunRow || + prev.workflowNameById !== next.workflowNameById ) { return false } @@ -2829,6 +2906,7 @@ const DataRow = React.memo(function DataRow({ hasWorkflowColumns, onStopRow, onRunRow, + workflowNameById, }: DataRowProps) { const sel = normalizedSelection const isMultiCell = sel !== null && (sel.startRow !== sel.endRow || sel.startCol !== sel.endCol) @@ -2946,6 +3024,7 @@ const DataRow = React.memo(function DataRow({ initialCharacter={isEditing ? initialCharacter : undefined} onSave={(value, reason) => onSave(row.id, column.name, value, reason)} onCancel={onCancel} + workflowNameById={workflowNameById} />
@@ -2962,6 +3041,7 @@ function CellContent({ initialCharacter, onSave, onCancel, + workflowNameById, }: { value: unknown column: ColumnDefinition @@ -2969,6 +3049,7 @@ function CellContent({ initialCharacter?: string | null onSave: (value: unknown, reason: SaveReason) => void onCancel: () => void + workflowNameById?: Record }) { const isNull = value === null || value === undefined @@ -2976,9 +3057,14 @@ function CellContent({ if (column.type === 'workflow') { const cell = value as WorkflowCellValue | null if (cell?.status === 'running') { + const workflowName = + (cell.workflowId ? workflowNameById?.[cell.workflowId] : undefined) ?? 'workflow' displayContent = ( -
- +
+ + + Running {workflowName} +
) } else if (cell?.status === 'completed' && cell.output != null) { @@ -2989,17 +3075,25 @@ function CellContent({ ) } else if (cell?.status === 'error') { displayContent = ( - - {cell.error ?? 'Error'} + + Error ) - } else { + } else if (cell?.status === 'cancelled') { displayContent = ( - + + Cancelled + ) + } else { + displayContent = } return <>{displayContent} - } else if (column.type === 'boolean') { + } + if (column.type === 'boolean') { displayContent = (
void onDragEnd?: () => void onDragLeave?: () => void - manualTriggerWorkflows?: ManualTriggerWorkflow[] + workflows?: WorkflowMetadata[] onChangeToWorkflow?: (columnName: string, workflowId: string) => void onConfigureWorkflow?: (columnName: string) => void }) { @@ -3370,9 +3464,9 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ const [menuPosition, setMenuPosition] = useState({ x: 0, y: 0 }) const configuredWorkflow = column.type === 'workflow' && column.workflowConfig - ? manualTriggerWorkflows?.find((w) => w.workflowId === column.workflowConfig!.workflowId) + ? workflows?.find((w) => w.id === column.workflowConfig!.workflowId) : undefined - const workflowColor = configuredWorkflow?.workflowColor + const workflowColor = configuredWorkflow?.color useEffect(() => { if (isRenaming && renameInputRef.current) { @@ -3604,26 +3698,11 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ {option.label} - - {manualTriggerWorkflows && manualTriggerWorkflows.length > 0 ? ( - manualTriggerWorkflows.map((wf) => ( - - onChangeToWorkflow?.(column.name, wf.workflowId) - } - > - - {wf.workflowName} - - )) - ) : ( - - No manual triggers configured - - )} - + onChangeToWorkflow?.(column.name, workflowId)} + disabledWorkflowId={column.workflowConfig?.workflowId} + /> ) } @@ -3678,51 +3757,239 @@ const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ ) }) -interface RunStatusPillProps { +interface ExpandedCellPopoverProps { + expandedCell: EditingCell | null + onClose: () => void + rows: TableRowType[] + columns: ColumnDefinition[] + onSave: (rowId: string, columnName: string, value: unknown, reason: SaveReason) => void + canEdit: boolean + scrollContainer: HTMLElement | null +} + +const EXPANDED_CELL_MIN_WIDTH = 420 +const EXPANDED_CELL_HEIGHT = 280 + +/** + * Supabase-style anchored cell expander. Floats over the clicked cell at the cell's + * top-left, minimum width {@link EXPANDED_CELL_MIN_WIDTH}, fixed height, internally + * scrollable. Triggered by cell double-click so long values are readable/editable + * without widening the column. Inline edit via Enter/F2/typing is unaffected. + * + * Workflow and boolean cells are read-only in this view — workflow cells are driven + * by the scheduler, booleans use a checkbox cell inline. + */ +function ExpandedCellPopover({ + expandedCell, + onClose, + rows, + columns, + onSave, + canEdit, + scrollContainer, +}: ExpandedCellPopoverProps) { + const rootRef = useRef(null) + const textareaRef = useRef(null) + const [rect, setRect] = useState<{ top: number; left: number; width: number } | null>(null) + const [draftValue, setDraftValue] = useState('') + + const target = useMemo(() => { + if (!expandedCell) return null + const row = rows.find((r) => r.id === expandedCell.rowId) + const column = columns.find((c) => c.name === expandedCell.columnName) + if (!row || !column) return null + const colIndex = columns.findIndex((c) => c.name === expandedCell.columnName) + return { row, column, colIndex, value: row.data[column.name] } + }, [expandedCell, rows, columns]) + + const isWorkflowCell = target?.column.type === 'workflow' + const isBooleanCell = target?.column.type === 'boolean' + const isEditable = Boolean(target) && canEdit && !isWorkflowCell && !isBooleanCell + + const displayText = useMemo(() => { + if (!target) return '' + const { column, value } = target + if (column.type === 'workflow') { + const cell = value as WorkflowCellValue | null | undefined + if (!cell) return '' + if (cell.status === 'error') return cell.error ?? 'Error' + if (cell.status !== 'completed') return '' + const output = cell.output + if (output == null) return '' + return typeof output === 'string' ? output : JSON.stringify(output, null, 2) + } + if (value == null) return '' + if (typeof value === 'string') return value + return JSON.stringify(value, null, 2) + }, [target]) + + useLayoutEffect(() => { + if (!expandedCell || !target) { + setRect(null) + return + } + setDraftValue(isEditable ? formatValueForInput(target.value, target.column.type) : '') + const selector = `[data-table-scroll] [data-row="${target.row.position}"][data-col="${target.colIndex}"]` + const el = document.querySelector(selector) + if (!el) { + setRect(null) + return + } + const r = el.getBoundingClientRect() + setRect({ top: r.top, left: r.left, width: r.width }) + // Focus textarea on open so typing works immediately. + requestAnimationFrame(() => textareaRef.current?.focus()) + }, [expandedCell, target, isEditable]) + + useEffect(() => { + if (!expandedCell) return + const handleKey = (e: KeyboardEvent) => { + if (e.key === 'Escape') { + e.preventDefault() + onClose() + } + } + const handleMouseDown = (e: MouseEvent) => { + if (!rootRef.current) return + if (rootRef.current.contains(e.target as Node)) return + onClose() + } + window.addEventListener('keydown', handleKey) + window.addEventListener('mousedown', handleMouseDown) + return () => { + window.removeEventListener('keydown', handleKey) + window.removeEventListener('mousedown', handleMouseDown) + } + }, [expandedCell, onClose]) + + // Close on table scroll — re-anchoring mid-scroll is more jarring than dismissing. + useEffect(() => { + if (!expandedCell || !scrollContainer) return + const handler = () => onClose() + scrollContainer.addEventListener('scroll', handler, { passive: true }) + return () => scrollContainer.removeEventListener('scroll', handler) + }, [expandedCell, scrollContainer, onClose]) + + if (!expandedCell || !target || !rect) return null + + const width = Math.max(rect.width, EXPANDED_CELL_MIN_WIDTH) + // Clamp to viewport. Prefer anchoring at the cell's left edge; if the popover + // would overflow right, align its right edge with the cell's right edge + // (mirroring Radix/menu flip behavior). Same idea for bottom-of-viewport. + const VIEWPORT_PAD = 8 + const cellRight = rect.left + rect.width + const overflowsRight = rect.left + width > window.innerWidth - VIEWPORT_PAD + const left = overflowsRight + ? Math.max(VIEWPORT_PAD, cellRight - width) + : Math.max(VIEWPORT_PAD, rect.left) + const overflowsBottom = rect.top + EXPANDED_CELL_HEIGHT > window.innerHeight - VIEWPORT_PAD + const top = overflowsBottom + ? Math.max(VIEWPORT_PAD, window.innerHeight - EXPANDED_CELL_HEIGHT - VIEWPORT_PAD) + : rect.top + + const handleSave = () => { + if (!isEditable) return + // `displayToStorage` only normalizes dates — it returns null for anything else. + // Fall back to the raw draft for non-date columns, matching the inline editor. + const raw = displayToStorage(draftValue) ?? draftValue + const cleaned = cleanCellValue(raw, target.column) + onSave(target.row.id, target.column.name, cleaned, 'blur') + onClose() + } + + const handleTextareaKeyDown = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault() + handleSave() + } + } + + return ( +
+ {isEditable ? ( + <> +