improvement(tables): versioned CSV snapshot cache for table mounts + parallel multipart uploader#5108
improvement(tables): versioned CSV snapshot cache for table mounts + parallel multipart uploader#5108TheodoreSpeaks wants to merge 1 commit into
Conversation
…parallel multipart uploader
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
PR SummaryMedium Risk Overview New Introduces Reviewed by Cursor Bugbot for commit 03e7545. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
@greptile review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 03e7545. Configure here.
| const head = await headObject(key, SNAPSHOT_STORAGE_CONTEXT) | ||
| if (head) { | ||
| logger.info(`[${requestId}] Snapshot hit`, { tableId: table.id, version, size: head.size }) | ||
| return { key, size: head.size, version } |
There was a problem hiding this comment.
Schema edits skip snapshot invalidation
Medium Severity
The snapshot cache treats a headObject hit on v{rows_version} as valid, but rows_version only advances when user_table_rows changes. Schema-only updates (for example adding a column via user_table_definitions) leave the old CSV mounted until some row is written.
Reviewed by Cursor Bugbot for commit 03e7545. Configure here.
Greptile SummaryThis PR replaces the existing "drain entire table into heap on every mount" approach with a versioned CSV snapshot cache in object storage, and refactors the table-export worker onto a new streaming multipart uploader — fixing a real production OOM on large table mounts. Both features are gated behind a new
Confidence Score: 5/5Safe to merge behind the feature flag; the flag-off path is unchanged and the new snapshot/multipart paths are well-structured with correct error handling and tenant isolation. The new snapshot cache and multipart uploader are thoroughly tested and the logic is sound. The only new finding in this pass is a minor API inconsistency in MultipartUploadHandle.complete() — it skips the aborted guard that write() already enforces — but no current caller reaches a state where complete() is invoked on an already-aborted handle, so there is no observable misbehavior today. apps/sim/lib/uploads/core/storage-service.ts and apps/sim/lib/table/snapshot-cache.ts warrant a second look once the issues flagged in earlier review threads are addressed. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant FE as function-execute
participant SC as snapshot-cache
participant DB as Postgres
participant ST as Object Storage
FE->>DB: isFeatureEnabled('table-snapshot-cache')
DB-->>FE: true
FE->>SC: getOrCreateTableSnapshot(table, requestId)
SC->>DB: SELECT rows_version FROM user_table_definitions
DB-->>SC: "version = 5"
SC->>ST: headObject(table-snapshots/ws/tbl/v5.csv)
alt Cache HIT
ST-->>SC: "{ size: 42000 }"
SC-->>FE: "{ key: v5.csv, size: 42000, version: 5 }"
else Cache MISS
ST-->>SC: null
SC->>ST: createMultipartUpload(key)
loop page rows (keyset-paginated)
SC->>DB: selectExportRowPage(after, 5000)
DB-->>SC: rows[]
SC->>ST: handle.write(csvChunk)
end
SC->>ST: handle.complete()
ST-->>SC: "{ key, size }"
SC->>DB: SELECT rows_version (recheck)
alt version unchanged
DB-->>SC: "version = 5"
SC->>ST: deleteFile(v4.csv)
SC-->>FE: "{ key: v5.csv, size, version: 5 }"
else version advanced mid-scan
DB-->>SC: "version = 6"
SC->>ST: materialize v6.csv
SC->>ST: deleteFile(v5.csv)
SC-->>FE: "{ key: v6.csv, size, version: 6 }"
end
end
FE->>ST: "downloadFile(key, maxBytes=10MB)"
ST-->>FE: Buffer
FE->>FE: push sandbox file
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant FE as function-execute
participant SC as snapshot-cache
participant DB as Postgres
participant ST as Object Storage
FE->>DB: isFeatureEnabled('table-snapshot-cache')
DB-->>FE: true
FE->>SC: getOrCreateTableSnapshot(table, requestId)
SC->>DB: SELECT rows_version FROM user_table_definitions
DB-->>SC: "version = 5"
SC->>ST: headObject(table-snapshots/ws/tbl/v5.csv)
alt Cache HIT
ST-->>SC: "{ size: 42000 }"
SC-->>FE: "{ key: v5.csv, size: 42000, version: 5 }"
else Cache MISS
ST-->>SC: null
SC->>ST: createMultipartUpload(key)
loop page rows (keyset-paginated)
SC->>DB: selectExportRowPage(after, 5000)
DB-->>SC: rows[]
SC->>ST: handle.write(csvChunk)
end
SC->>ST: handle.complete()
ST-->>SC: "{ key, size }"
SC->>DB: SELECT rows_version (recheck)
alt version unchanged
DB-->>SC: "version = 5"
SC->>ST: deleteFile(v4.csv)
SC-->>FE: "{ key: v5.csv, size, version: 5 }"
else version advanced mid-scan
DB-->>SC: "version = 6"
SC->>ST: materialize v6.csv
SC->>ST: deleteFile(v5.csv)
SC-->>FE: "{ key: v6.csv, size, version: 6 }"
end
end
FE->>ST: "downloadFile(key, maxBytes=10MB)"
ST-->>FE: Buffer
FE->>FE: push sandbox file
Reviews (2): Last reviewed commit: "improvement(tables): versioned CSV snaps..." | Re-trigger Greptile |
| if (after !== version) { | ||
| // The table mutated mid-scan: the bytes under `key` may be torn. Re-key to the new version and | ||
| // rebuild once (or reuse if a concurrent writer already stored it); drop the stale object. | ||
| logger.info(`[${requestId}] rows_version advanced during materialize; re-keying`, { | ||
| tableId: table.id, | ||
| from: version, | ||
| to: after, | ||
| }) | ||
| const newKey = snapshotKey(table.workspaceId, table.id, after) | ||
| const newHead = await headObject(newKey, SNAPSHOT_STORAGE_CONTEXT) | ||
| const newSize = newHead ? newHead.size : await materialize(table, newKey) | ||
| await deleteFile({ key, context: SNAPSHOT_STORAGE_CONTEXT }).catch(() => {}) | ||
| void deletePreviousVersion(table, after) | ||
| return { key: newKey, size: newSize, version: after } | ||
| } |
There was a problem hiding this comment.
Torn snapshot not cleaned when second materialize throws
When rows_version advances mid-scan (rare but possible), the torn snapshot is written to key (e.g., v3.csv) and then the code attempts a second materialize(table, newKey). If that second call throws — for example because a further bulk insert pushes the table past SNAPSHOT_MAX_BYTES — execution jumps out of the block before reaching deleteFile({ key, … }). The torn v3.csv is then permanently orphaned in object storage (serving stale/incorrect data until an S3 lifecycle rule reaps it).
The deleteFile for key should run before (or in a finally around) the second materialize to guarantee cleanup regardless of outcome.
| void deletePreviousVersion(table, after) | ||
| return { key: newKey, size: newSize, version: after } |
There was a problem hiding this comment.
Re-key path deletes the wrong prior version
deletePreviousVersion(table, after) computes snapshotKey(…, after - 1) — which is the torn v3.csv that was already explicitly deleted two lines above. The actual previous clean snapshot (v(version - 1), e.g., v2.csv) is silently leaked.
Consider calling deletePreviousVersion(table, version) here instead of (or in addition to) deletePreviousVersion(table, after), so the cleanup targets the last known-good snapshot that is now superseded by the newly stored v{after}.
Note: the first deleteFile({ key, … }) already handles the torn v3.csv, so the deletePreviousVersion(table, after) call is redundant as written.
| const dispatchPart = async (body: Buffer): Promise<void> => { | ||
| // Bound concurrency: wait for a free slot before starting another part. | ||
| while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight) | ||
| if (firstError) throw firstError | ||
| const be = await ensureBackend() | ||
| const partNo = ++partNumber | ||
| const p = be | ||
| .uploadPart(partNo, body) | ||
| .catch((err) => { | ||
| firstError ??= err | ||
| }) | ||
| .finally(() => { | ||
| inflight.delete(p) | ||
| }) | ||
| inflight.add(p) | ||
| } |
There was a problem hiding this comment.
dispatchPart does not check aborted before dispatching
abort() sets aborted = true and then awaits all current in-flight parts before calling backend.abort(). If dispatchPart is concurrently suspended in the while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight) loop, it will resume after abort() returns, skip the firstError check (no error was set), and dispatch a new UploadPart on an already-aborted multipart upload. S3 would return a NoSuchUpload error, which would eventually surface as firstError, but the extra request and its error path could be confusing. Adding if (aborted) return (or throw) after the while loop mirrors the guard in write().


Summary
rows_versiononuser_table_definitions, bumped by a statement-level Postgres trigger onuser_table_rows(INSERT/UPDATE/DELETE) — bypass-proof; reorders/edits invalidate the cache. Mirrors the 0224 statement-level trigger pattern (one bump per statement, no per-row contention).table-snapshots/{workspaceId}/{tableId}/v{rows_version}.csv:headObjecthit = no row read; miss = page rows → canonical export-format CSV → upload. Best-effort version recheck + previous-version cleanup. Capped at the 10MB mount ceiling (the table branch had no size guard before).createMultipartUpload) to the storage layer — bounded-concurrency parts, single-PutObject fast path for small files — and refactor the table-export worker onto it so it no longer buffers the whole file in heap.table-snapshot-cachefeature flag, default OFF. Flag-off keeps the existing inline-CSV path byte-for-byte.Type of Change
Testing
rows_versiontrigger verified bumping on update (applied locally, sincedb pushdoesn't emit trigger DDL).bun run check:migrations origin/staging✓ (expand-safe: additive defaulted column + triggers),check:api-validation:strict✓,lint:check✓,tsc --noEmit✓ (0 errors).Checklist
Follow-ups (not in this PR)
table-snapshots/prefix (e.g. 7d) to reap old versions — they're a pure regenerable cache.