Skip to content

Commit de150f2

Browse files
waleedlatif1claude
andcommitted
fix(tables): serialize explicit-position inserts with advisory lock
The `(table_id, position)` index is non-unique. Concurrent explicit- position inserts at the same slot can both observe the slot empty, both skip the shift, then each INSERT — producing duplicate `(table_id, position)` rows. Take the per-table advisory lock in the explicit-position branches of `insertRow` and `batchInsertRows` (the auto-position branches already do this). Updated the test that asserted the explicit path skipped the lock, and added coverage for `batchInsertRows` with explicit positions. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 8a0c851 commit de150f2

2 files changed

Lines changed: 52 additions & 4 deletions

File tree

apps/sim/lib/table/__tests__/update-row.test.ts

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ describe('insertRow — position race safety (migration 0198 + advisory lock)',
170170
expect(findExecutedSqlContaining('hashtextextended')).toBe(true)
171171
})
172172

173-
it('explicit-position inserts skip the advisory lock (no max(position) race to guard)', async () => {
173+
it('explicit-position inserts also acquire the advisory lock to serialize position shifts', async () => {
174174
dbChainMockFns.limit.mockResolvedValueOnce([])
175175
dbChainMockFns.returning.mockResolvedValueOnce([
176176
{
@@ -190,7 +190,10 @@ describe('insertRow — position race safety (migration 0198 + advisory lock)',
190190
'req-1'
191191
)
192192

193-
expect(findExecutedSqlContaining('pg_advisory_xact_lock')).toBe(false)
193+
// `(table_id, position)` index is non-unique, so concurrent explicit-position
194+
// inserts at the same slot could both skip the shift and duplicate — lock
195+
// serializes them.
196+
expect(findExecutedSqlContaining('pg_advisory_xact_lock')).toBe(true)
194197
})
195198

196199
it('batchInsertRows acquires the advisory lock (always auto-positioned)', async () => {
@@ -205,6 +208,42 @@ describe('insertRow — position race safety (migration 0198 + advisory lock)',
205208
expect(findExecutedSqlContaining('pg_advisory_xact_lock')).toBe(true)
206209
})
207210

211+
it('batchInsertRows with explicit positions acquires the advisory lock', async () => {
212+
dbChainMockFns.returning.mockResolvedValueOnce([
213+
{
214+
id: 'row-1',
215+
tableId: 'tbl-1',
216+
workspaceId: 'ws-1',
217+
data: { name: 'a' },
218+
position: 3,
219+
createdAt: new Date(),
220+
updatedAt: new Date(),
221+
},
222+
{
223+
id: 'row-2',
224+
tableId: 'tbl-1',
225+
workspaceId: 'ws-1',
226+
data: { name: 'b' },
227+
position: 4,
228+
createdAt: new Date(),
229+
updatedAt: new Date(),
230+
},
231+
])
232+
233+
await batchInsertRows(
234+
{
235+
tableId: 'tbl-1',
236+
rows: [{ name: 'a' }, { name: 'b' }],
237+
workspaceId: 'ws-1',
238+
positions: [3, 4],
239+
},
240+
TABLE,
241+
'req-1'
242+
)
243+
244+
expect(findExecutedSqlContaining('pg_advisory_xact_lock')).toBe(true)
245+
})
246+
208247
it('upsertRow skips the advisory lock on the update path (match found)', async () => {
209248
vi.mocked(getUniqueColumns).mockReturnValue([{ name: 'name', type: 'string', unique: true }])
210249
dbChainMockFns.limit.mockResolvedValueOnce([

apps/sim/lib/table/service.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,13 @@ export async function insertRow(
651651

652652
let targetPosition: number
653653

654+
// The `(table_id, position)` index is non-unique, so we serialize all
655+
// position-aware writes (explicit and auto) through the per-table
656+
// advisory lock. Without this, two concurrent explicit-position inserts
657+
// at the same position can both observe an empty slot, both skip the
658+
// shift, and each INSERT a row with a duplicate `(table_id, position)`.
659+
await acquireTablePositionLock(trx, data.tableId)
660+
654661
if (data.position !== undefined) {
655662
targetPosition = data.position
656663

@@ -674,7 +681,6 @@ export async function insertRow(
674681
)
675682
}
676683
} else {
677-
await acquireTablePositionLock(trx, data.tableId)
678684
const [{ maxPos }] = await trx
679685
.select({
680686
maxPos: sql<number>`coalesce(max(${userTableRows.position}), -1)`.mapWith(Number),
@@ -772,6 +778,10 @@ export async function batchInsertRows(
772778
...(data.userId ? { createdBy: data.userId } : {}),
773779
})
774780

781+
// Serialize position-aware writes per-table. See `acquireTablePositionLock`
782+
// for why both explicit- and auto-position paths take this lock.
783+
await acquireTablePositionLock(trx, data.tableId)
784+
775785
if (data.positions && data.positions.length > 0) {
776786
// Position-aware insert: shift existing rows to create gaps, then insert.
777787
// Process positions ascending so each shift preserves gaps created by prior shifts.
@@ -791,7 +801,6 @@ export async function batchInsertRows(
791801
return trx.insert(userTableRows).values(rowsToInsert).returning()
792802
}
793803

794-
await acquireTablePositionLock(trx, data.tableId)
795804
const [{ maxPos }] = await trx
796805
.select({
797806
maxPos: sql<number>`coalesce(max(${userTableRows.position}), -1)`.mapWith(Number),

0 commit comments

Comments
 (0)