improvement(mothership): user_table speed parity — limit bounds, keyset paging, background import/delete jobs#5012
Conversation
|
@greptile review |
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryMedium Risk Overview Large CSV/TSV imports (≥8MB) and unbounded filter deletes (>1000 matches) now use the same background jobs as the UI (Trigger.dev or detached workers), with per-table job claiming so imports/deletes cannot race. Smaller imports still run inline but claim and release the write slot. Generated tool catalog/schemas are refreshed (computed keys + doc tweaks). Reviewed by Cursor Bugbot for commit 467d7ab. Bugbot is set up for automated code reviews on this repo. Configure here. |
Greptile SummaryThis PR brings Mothership's
Confidence Score: 4/5The core limit-clamping and job-slot logic is sound, but there is an asymmetry in the non-trigger.dev background import path that could leave a table's write-job slot permanently locked. The non-trigger.dev path of dispatchImportJob calls runDetached with no .catch(), so if runTableImport fails without internally calling markJobFailed, the table's job slot stays locked as running indefinitely. The parallel dispatchDeleteJob function explicitly wraps runTableDelete with .catch(markTableDeleteFailed), treating slot release as the dispatcher's responsibility. The new import-runner.test.ts tests only cover happy-path behavior, leaving the failure path unverified. The rest of the changes are correct and well-tested. apps/sim/lib/copilot/tools/server/table/user-table.ts — specifically dispatchImportJob (non-trigger.dev else branch) and the import_file background dispatch block. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[import_file / create_from_file] --> B{shouldImportInBackground?}
B -- CSV/TSV >= 8MB --> C[markTableJobRunning]
C -- claimed=false --> D[Return: job already running]
C -- claimed=true --> E[dispatchImportJob]
E -- isTriggerDevEnabled --> F[tasks.trigger table-import]
F -- failure --> G[releaseJobClaim + throw]
F -- success --> H[Return: background started]
E -- else --> I[runDetached runTableImport NO catch]
I --> H
B -- small file --> J[markTableJobRunning inline]
J -- claimed=true --> K[download + parse + insert]
K --> L[releaseJobClaim finally]
M[delete_rows_by_filter] --> N{limit provided?}
N -- no --> P[queryRows count]
P -- count > 1000 --> Q[markTableJobRunning background]
Q -- claimed --> R[dispatchDeleteJob]
R -- else --> V[runDetached runTableDelete .catch markTableDeleteFailed]
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A[import_file / create_from_file] --> B{shouldImportInBackground?}
B -- CSV/TSV >= 8MB --> C[markTableJobRunning]
C -- claimed=false --> D[Return: job already running]
C -- claimed=true --> E[dispatchImportJob]
E -- isTriggerDevEnabled --> F[tasks.trigger table-import]
F -- failure --> G[releaseJobClaim + throw]
F -- success --> H[Return: background started]
E -- else --> I[runDetached runTableImport NO catch]
I --> H
B -- small file --> J[markTableJobRunning inline]
J -- claimed=true --> K[download + parse + insert]
K --> L[releaseJobClaim finally]
M[delete_rows_by_filter] --> N{limit provided?}
N -- no --> P[queryRows count]
P -- count > 1000 --> Q[markTableJobRunning background]
Q -- claimed --> R[dispatchDeleteJob]
R -- else --> V[runDetached runTableDelete .catch markTableDeleteFailed]
Reviews (9): Last reviewed commit: "improvement(mothership): user_table spee..." | Re-trigger Greptile |
| const tableMounts = await Promise.all( | ||
| inputTables.map(async (tableRef) => { | ||
| const tableId = | ||
| typeof tableRef === 'string' | ||
| ? tableRef | ||
| : tableRef && typeof tableRef === 'object' | ||
| ? (tableRef as CanonicalTableInput).tableId || (tableRef as CanonicalTableInput).path | ||
| : undefined | ||
| if (!tableId) return null | ||
| const table = await resolveTableRef(tableId, tablePathLookup) | ||
| if (!table || table.workspaceId !== workspaceId) { | ||
| throw new Error( | ||
| `Input table not found: "${tableId}". Pass the table id (tbl_...) from tables/{name}/meta.json, or a tables/{name}/meta.json path.` | ||
| ) | ||
| } | ||
| const csvContent = await buildTableCsvForMount(table) | ||
| const sandboxPath = | ||
| typeof tableRef === 'object' && tableRef !== null | ||
| ? (tableRef as CanonicalTableInput).sandboxPath | ||
| : undefined | ||
| if (!tableId) continue | ||
| const table = await resolveTableRef(tableId, tablePathLookup) | ||
| if (!table || table.workspaceId !== workspaceId) { | ||
| throw new Error( | ||
| `Input table not found: "${tableId}". Pass the table id (tbl_...) from tables/{name}/meta.json, or a tables/{name}/meta.json path.` | ||
| ) | ||
| } | ||
| const rows = await queryRows(table, {}, 'copilot-fn-exec') | ||
|
|
||
| const allKeys = new Set(table.schema.columns.map((column) => column.name)) | ||
| for (const row of rows.rows ?? []) { | ||
| if (row.data && typeof row.data === 'object') { | ||
| for (const key of Object.keys(row.data as Record<string, unknown>)) { | ||
| allKeys.add(key) | ||
| } | ||
| return { | ||
| path: sandboxPath || `/home/user/tables/${table.id}.csv`, | ||
| content: csvContent, | ||
| } | ||
| } | ||
| const headers = Array.from(allKeys) | ||
| const csvLines = [headers.join(',')] | ||
| for (const row of rows.rows ?? []) { | ||
| const data = (row.data || {}) as Record<string, unknown> | ||
| csvLines.push( | ||
| headers | ||
| .map((h) => { | ||
| const val = data[h] | ||
| const str = val === null || val === undefined ? '' : String(val) | ||
| return str.includes(',') || str.includes('"') || str.includes('\n') | ||
| ? `"${str.replace(/"/g, '""')}"` | ||
| : str | ||
| }) | ||
| .join(',') | ||
| }) | ||
| ) | ||
| for (const mount of tableMounts) { | ||
| if (!mount) continue | ||
| if (totalSize + mount.content.length > MAX_TOTAL_SIZE) { | ||
| throw new Error( | ||
| `Mounting table "${mount.path}" would exceed the ${MAX_TOTAL_SIZE / 1024 / 1024}MB total mount limit. Mount fewer or smaller tables.` | ||
| ) | ||
| } | ||
| const csvContent = csvLines.join('\n') | ||
| const sandboxPath = | ||
| typeof tableRef === 'object' && tableRef !== null | ||
| ? (tableRef as CanonicalTableInput).sandboxPath | ||
| : undefined | ||
| sandboxFiles.push({ | ||
| path: sandboxPath || `/home/user/tables/${table.id}.csv`, | ||
| content: csvContent, | ||
| }) | ||
| totalSize += mount.content.length | ||
| sandboxFiles.push(mount) | ||
| } |
There was a problem hiding this comment.
Full table CSVs built before budget enforcement
All input tables are serialized in parallel via Promise.all before any size check runs. For each table the buildTableCsvForMount loop fetches every row page-by-page and accumulates the entire CSV in memory — no size gate inside the loop. The outer budget check (totalSize + mount.content.length > MAX_TOTAL_SIZE) only fires after every table has been fully loaded.
Concrete failure: a user mounts two enterprise-plan tables each at ~40MB of rows. Both CSVs are built (80MB in memory) before the loop sees that the second one exceeds the 50MB cap and throws. File and directory mounts check record.size before fetching; tables lack that pre-flight guard. Under traffic, several concurrent requests doing this can exhaust the web container's heap.
Greptile SummaryThis PR closes three performance gaps in Mothership's table operations: sandbox table mounts now drain all rows via keyset pagination instead of silently truncating to 100,
Confidence Score: 3/5Safe to merge for most workloads; enterprise tables with hundreds of thousands of rows could exhaust Vercel function memory during a sandbox mount before the 50 MB budget check fires. The core job-slot migration, keyset pagination, background import/delete dispatch, and schema change are all well-structured and tested. The one concern worth resolving before a wide enterprise rollout is buildTableCsvForMount: all table CSVs are fully assembled in parallel memory before the budget check, so a user who mounts a 500k-row enterprise table gets a function OOM rather than a clean rejection. apps/sim/lib/copilot/tools/handlers/function-execute.ts — the buildTableCsvForMount loop and the parallel Promise.all mount assembly need a per-page budget check to avoid OOM on large enterprise tables. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Mothership: import_file / create_from_file] --> B{CSV/TSV 8 MB?}
B -- No --> C[markTableJobRunning claim slot]
C --> D[Inline import parseFileRows + batchInsert]
D --> E[releaseJobClaim]
B -- Yes --> F[createTable with jobStatus=running or markTableJobRunning]
F --> G{isTriggerDevEnabled?}
G -- Yes --> H[tasks.trigger tableImportTask]
G -- No --> I[runDetached runTableImport]
H -- dispatch error --> J[releaseJobClaim / deleteTable]
I --> K[runTableImport streaming keyset import]
H --> K
L[Mothership: delete_rows_by_filter] --> M{limit === undefined?}
M -- Yes --> N[queryRows count with pendingDeleteMask]
N --> O{matchCount > 1000?}
O -- No --> P[deleteRowsByFilter inline]
O -- Yes --> Q[markTableJobRunning claim]
Q --> R[dispatchDeleteJob]
R --> S[runTableDelete keyset page walk]
M -- No --> P
T[Mothership: query_rows] --> U{limit > 1000?}
U -- Yes --> V[return error]
U -- No --> W[queryRows with pendingDeleteMask + after cursor]
W --> X[return rows + nextCursor]
Y[Mothership: get_job] --> Z[getTableById latestJobForTable]
Z --> AA[return job status / rowCount]
Reviews (2): Last reviewed commit: "improvement(mothership): table speed par..." | Re-trigger Greptile |
| * would corrupt values. | ||
| */ | ||
| function formatMountCsvValue(value: unknown): string { | ||
| if (value === null || value === undefined) return '' | ||
| if (value instanceof Date) return value.toISOString() | ||
| if (typeof value === 'object') return JSON.stringify(value) | ||
| return String(value) | ||
| } | ||
|
|
||
| /** | ||
| * Serializes a full table to CSV for a sandbox mount. Walks the keyset export | ||
| * reader page by page so every row is included (`queryRows` with defaults | ||
| * silently truncated mounts to its 100-row page and paid for a count and | ||
| * execution metadata the CSV never used), and remaps stored column-id keys | ||
| * back to display names so headers line up with cell values. | ||
| */ | ||
| async function buildTableCsvForMount(table: TableDefinition): Promise<string> { | ||
| const nameById = buildNameById(table.schema) | ||
| const headers = table.schema.columns.map((c) => c.name) | ||
| const lines = [toCsvRow(headers)] | ||
| let after: { position: number; id: string } | null = null | ||
| while (true) { | ||
| const page = await selectExportRowPage(table, after, TABLE_MOUNT_PAGE_SIZE) | ||
| for (const row of page) { | ||
| const data = rowDataIdToName(row.data, nameById) | ||
| lines.push(toCsvRow(headers.map((header) => formatMountCsvValue(data[header])))) | ||
| } | ||
| if (page.length < TABLE_MOUNT_PAGE_SIZE) return lines.join('\n') | ||
| const last = page[page.length - 1] | ||
| after = { position: last.position, id: last.id } | ||
| } | ||
| } | ||
|
|
||
| async function resolveInputFiles( | ||
| workspaceId: string, | ||
| inputFiles?: unknown[], |
There was a problem hiding this comment.
Full CSV materialized in memory before the 50 MB budget check
buildTableCsvForMount drains every row into a lines[] array and only returns after the full table is assembled. The budget check in the caller happens after all table CSVs are built via Promise.all. For an enterprise table with 100k+ rows, the CSV string can be hundreds of megabytes or more before the check ever fires — the budget guard prevents the file from reaching the sandbox, but the memory damage is already done (and in the parallel case, all tables are materialized simultaneously).
The old queryRows path implicitly bounded this to 100 rows. A straightforward fix is to track a running byte count inside the drain loop and throw early once the caller's budget would be exceeded — or pass maxBytes as a parameter and abort the page walk.
| * would corrupt values. | ||
| */ | ||
| function formatMountCsvValue(value: unknown): string { | ||
| if (value === null || value === undefined) return '' | ||
| if (value instanceof Date) return value.toISOString() | ||
| if (typeof value === 'object') return JSON.stringify(value) | ||
| return String(value) | ||
| } | ||
|
|
||
| /** | ||
| * Serializes a full table to CSV for a sandbox mount. Walks the keyset export | ||
| * reader page by page so every row is included (`queryRows` with defaults | ||
| * silently truncated mounts to its 100-row page and paid for a count and | ||
| * execution metadata the CSV never used), and remaps stored column-id keys | ||
| * back to display names so headers line up with cell values. | ||
| */ | ||
| async function buildTableCsvForMount(table: TableDefinition): Promise<string> { | ||
| const nameById = buildNameById(table.schema) | ||
| const headers = table.schema.columns.map((c) => c.name) | ||
| const lines = [toCsvRow(headers)] | ||
| let after: { position: number; id: string } | null = null | ||
| while (true) { | ||
| const page = await selectExportRowPage(table, after, TABLE_MOUNT_PAGE_SIZE) | ||
| for (const row of page) { | ||
| const data = rowDataIdToName(row.data, nameById) | ||
| lines.push(toCsvRow(headers.map((header) => formatMountCsvValue(data[header])))) | ||
| } | ||
| if (page.length < TABLE_MOUNT_PAGE_SIZE) return lines.join('\n') | ||
| const last = page[page.length - 1] | ||
| after = { position: last.position, id: last.id } | ||
| } | ||
| } | ||
|
|
||
| async function resolveInputFiles( | ||
| workspaceId: string, | ||
| inputFiles?: unknown[], |
There was a problem hiding this comment.
pendingDeleteMask fetched on every page — N+1 DB round-trips during drain
selectExportRowPage calls pendingDeleteMask(table) unconditionally, so a 100k-row table drained in 20 pages makes 20 extra DB round-trips just to check whether a delete job is running. The mask is derived from table.id which is constant, and the delete job status won't change meaningfully mid-drain. Fetching it once before the loop and threading it through would eliminate the repeated queries.
|
@greptile review |
|
@greptile review |
0e47d6e to
440c1d6
Compare
|
@greptile review |
440c1d6 to
419894a
Compare
419894a to
45096a5
Compare
| table, | ||
| { | ||
| filter: filterNamesToIds(args.filter, idByName), | ||
| filter: idFilter, |
There was a problem hiding this comment.
Inline delete bypasses bulk cap
Medium Severity
For delete_rows_by_filter without an explicit limit, a query_rows count at or below 1000 chooses the inline path, but deleteRowsByFilter runs with no limit and loads every current match into memory. Rows matching the filter can grow before delete runs, exceeding the 1000-row bulk cap this change adds elsewhere.
Reviewed by Cursor Bugbot for commit 45096a5. Configure here.
45096a5 to
59efc85
Compare
59efc85 to
23af5b9
Compare
|
@greptile review |
23af5b9 to
5a3faf1
Compare
|
@greptile review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 5a3faf1. Configure here.
| sort: args.sort ? sortNamesToIds(args.sort, idByName) : undefined, | ||
| limit: args.limit, | ||
| offset: args.offset, | ||
| withExecutions: false, |
There was a problem hiding this comment.
Keyset paging not wired
Medium Severity
The query_rows handler still forwards only limit and offset to queryRows, so Mothership never passes an after cursor, rejects after with sort, or returns nextCursor on full default-order pages. Large table reads keep using offset paging despite the row service supporting keyset seeks.
Reviewed by Cursor Bugbot for commit 5a3faf1. Configure here.
There was a problem hiding this comment.
Intentional — keyset/after was deliberately removed from this tool. For Mothership's envelope (≤~5k rows, 1000/page cap, exports beyond that) the offset→keyset perf delta is single-digit ms over a full walk, while keyset carried a real correctness gap (the cursor seeks order_key but the default order is position when TABLES_FRACTIONAL_ORDERING is off — the default). So the model paginates with limit/offset, which is correct under any config. Not a missing wire.
5a3faf1 to
077f33c
Compare
|
@greptile review |
…et paging, async import/delete jobs
077f33c to
467d7ab
Compare
|
@greptile review |
| } else { | ||
| runDetached('table-import', () => runTableImport(payload)) | ||
| } | ||
| } |
There was a problem hiding this comment.
runDetached import path has no failure handler, risking a permanently locked job slot
dispatchDeleteJob's non-trigger path wraps runTableDelete with an explicit .catch() that calls markTableDeleteFailed so the slot is released when the worker fails. dispatchImportJob's non-trigger path passes no .catch() to runTableImport. If runTableImport throws an unhandled exception (or any code path that skips its internal markJobFailed call), the table's write-job slot stays locked as running indefinitely — every subsequent import or delete attempt returns "A job is already in progress" until the record is manually corrected.
The asymmetry between the two dispatch functions is the concern: dispatchDeleteJob treats failure handling as the dispatcher's responsibility (explicit .catch), while dispatchImportJob defers entirely to runTableImport's internals. The new import-runner.test.ts tests only cover the happy path, so there is no test confirming all failure branches inside runTableImport call markJobFailed.


Context
Mothership's
user_tabletool lagged the fast/safe paths the tables feature already has. This PR lands three of the four findings from the audit; the fourth (thefunction_executelarge-table CSV mount) is deferred — see note at the bottom.Changes
C — limit bounds.
query_rowstook an unboundedlimit(whole table into one tool result) and the filter ops took unbounded limits (every matching row's id+data into memory). Now rejected aboveMAX_QUERY_LIMIT(1000) /MAX_BULK_OPERATION_SIZE(1000) with the contract's message text.D —
query_rowswaste + paging. Dropped the always-on executions-metadata load (withExecutions: false), and added keyset pagination: accepts anaftercursor, returnsnextCursorwhen a default-order page fills (rejectsafter+sort). Offset paging was O(n²) walking a big table.E — async job parity for imports/deletes.
import_file/create_from_file(CSV/TSV ≥8MB) anddelete_rows_by_filter(>1000 matches, no explicit limit) now dispatch the same trigger.dev jobs the UI routes use, claiming the per-table job slot so they can't race a running background job. Smaller imports stay inline but claim/release the slot. The model polls progress viaquery_rows(rows appear as an import lands; the delete mask makesquery_rowsreflect the post-delete view immediately) — no dedicated job-status op.TableImportPayload.deleteSourceFileflag (default true) so Mothership imports of persistent workspace files don't delete the source.Tests
user-table.test.ts(limit clamps, keysetafter/nextCursor,after+sortrejection, slot claim/release, async dispatch payloads) andimport-runner.test.ts(source-file cleanup default vsdeleteSourceFile: false).bun run lint:check,type-check, andcheck:api-validationgreen.Deferred (not in this PR)
filter/limit/columnsselection, or by-reference signed-URL fetch — is a separate effort.Model-facing docs
Catalog change documenting
after/ limit maxes / background behavior is in simstudioai/copilot#312.🤖 Generated with Claude Code