Skip to content

improvement(tables): versioned CSV snapshot cache for table mounts + parallel multipart uploader#5108

Open
TheodoreSpeaks wants to merge 1 commit into
stagingfrom
improvement/table-snapshot-cache
Open

improvement(tables): versioned CSV snapshot cache for table mounts + parallel multipart uploader#5108
TheodoreSpeaks wants to merge 1 commit into
stagingfrom
improvement/table-snapshot-cache

Conversation

@TheodoreSpeaks

Copy link
Copy Markdown
Collaborator

Summary

  • Mount Sim tables into code sandboxes by reference via a version-keyed CSV snapshot in object storage, instead of draining the whole table into web-process heap on every run (fixes a real prod OOM on large mounts).
  • Add a monotonic rows_version on user_table_definitions, bumped by a statement-level Postgres trigger on user_table_rows (INSERT/UPDATE/DELETE) — bypass-proof; reorders/edits invalidate the cache. Mirrors the 0224 statement-level trigger pattern (one bump per statement, no per-row contention).
  • Snapshot cache keyed table-snapshots/{workspaceId}/{tableId}/v{rows_version}.csv: headObject hit = no row read; miss = page rows → canonical export-format CSV → upload. Best-effort version recheck + previous-version cleanup. Capped at the 10MB mount ceiling (the table branch had no size guard before).
  • Add a parallel server-side multipart uploader (createMultipartUpload) to the storage layer — bounded-concurrency parts, single-PutObject fast path for small files — and refactor the table-export worker onto it so it no longer buffers the whole file in heap.
  • Gated behind a new table-snapshot-cache feature flag, default OFF. Flag-off keeps the existing inline-CSV path byte-for-byte.

Type of Change

  • Improvement / enhancement

Testing

  • Tested manually: multipart upload round-trips byte-for-byte against real S3; rows_version trigger verified bumping on update (applied locally, since db push doesn't emit trigger DDL).
  • Added focused Vitest: trigger-free unit coverage for the uploader (parts byte-identity, small-file path, abort), snapshot cache (hit/miss/cap/re-key/tenant), export-runner streaming, and the function-execute flag on/off + size-guard + tenant-isolation branches.
  • bun run check:migrations origin/staging ✓ (expand-safe: additive defaulted column + triggers), check:api-validation:strict ✓, lint:check ✓, tsc --noEmit ✓ (0 errors).

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

Follow-ups (not in this PR)

  • Set an S3 lifecycle rule on the table-snapshots/ prefix (e.g. 7d) to reap old versions — they're a pure regenerable cache.
  • Once the flag graduates, delete the legacy inline-CSV mount path.

@vercel

vercel Bot commented Jun 17, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
docs Ready Ready Preview, Comment Jun 17, 2026 3:12am

Request Review

@cursor

cursor Bot commented Jun 17, 2026

Copy link
Copy Markdown

PR Summary

Medium Risk
Touches copilot table mounts, background exports, and a new DB trigger on all row writes; default-off flag limits rollout but export streaming changes behavior for every export job.

Overview
Adds a version-keyed CSV snapshot cache so copilot function_execute can mount large tables from object storage instead of loading every row into the web process. Migration 0239 introduces rows_version on user_table_definitions with statement-level Postgres triggers on user_table_rows so any row change invalidates cached snapshots in one bump per statement.

New table-snapshot-cache flag (default off): when on and the table has ≥500 rows, mounts use getOrCreateTableSnapshot (headObject hit or streamed materialize to table-snapshots/{workspace}/{table}/{version}.csv), enforce existing 10MB per-file limits, then download for the sandbox; flag off and small tables keep the inline queryRows CSV path.

Introduces createMultipartUpload in the storage layer (bounded multipart parts on S3/Azure, single PutObject for small payloads) plus server-side uploadS3Part, stageBlobPart, and commitBlobBlockList. Table export is refactored to stream pages into storage with abort/cleanup on cancel or failure instead of buffering the full file in heap.

Reviewed by Cursor Bugbot for commit 03e7545. Bugbot is set up for automated code reviews on this repo. Configure here.

@TheodoreSpeaks

Copy link
Copy Markdown
Collaborator Author

@greptile review

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 03e7545. Configure here.

const head = await headObject(key, SNAPSHOT_STORAGE_CONTEXT)
if (head) {
logger.info(`[${requestId}] Snapshot hit`, { tableId: table.id, version, size: head.size })
return { key, size: head.size, version }

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema edits skip snapshot invalidation

Medium Severity

The snapshot cache treats a headObject hit on v{rows_version} as valid, but rows_version only advances when user_table_rows changes. Schema-only updates (for example adding a column via user_table_definitions) leave the old CSV mounted until some row is written.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 03e7545. Configure here.

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR replaces the existing "drain entire table into heap on every mount" approach with a versioned CSV snapshot cache in object storage, and refactors the table-export worker onto a new streaming multipart uploader — fixing a real production OOM on large table mounts. Both features are gated behind a new table-snapshot-cache feature flag, preserving the existing inline-CSV path byte-for-byte when the flag is off.

  • Snapshot cache: a monotonic rows_version column on user_table_definitions is bumped by a statement-level Postgres trigger on user_table_rows (mirrors the 0224 pattern). getOrCreateTableSnapshot stores a CSV at table-snapshots/{workspaceId}/{tableId}/v{version}.csv, serving subsequent mounts from a headObject check rather than a full row scan. A best-effort version recheck after materialization handles concurrent mutations; snapshots are capped at the existing 10 MB mount ceiling.
  • Parallel multipart uploader: createMultipartUpload in the storage layer buffers writes into ≥ 5 MB parts with bounded concurrency (MULTIPART_MAX_INFLIGHT = 4), falls back to a single PutObject for small payloads, and propagates abort on any error. The table-export worker (runTableExport) is refactored onto this handle, eliminating the full-file in-memory buffer.
  • Tests: focused Vitest coverage for the uploader (parts byte-identity, small-file path, abort), snapshot cache (hit/miss/cap/re-key/tenant), export-runner streaming, and the function-execute flag on/off + size-guard + tenant-isolation branches.

Confidence Score: 5/5

Safe to merge behind the feature flag; the flag-off path is unchanged and the new snapshot/multipart paths are well-structured with correct error handling and tenant isolation.

The new snapshot cache and multipart uploader are thoroughly tested and the logic is sound. The only new finding in this pass is a minor API inconsistency in MultipartUploadHandle.complete() — it skips the aborted guard that write() already enforces — but no current caller reaches a state where complete() is invoked on an already-aborted handle, so there is no observable misbehavior today.

apps/sim/lib/uploads/core/storage-service.ts and apps/sim/lib/table/snapshot-cache.ts warrant a second look once the issues flagged in earlier review threads are addressed.

Important Files Changed

Filename Overview
apps/sim/lib/table/snapshot-cache.ts New versioned CSV snapshot cache; materializes tables once per rows_version and reuses across executions. Three issues flagged in prior review threads (torn-snapshot orphan, wrong prior-version deletion in re-key path, and the dispatchPart abort race) are still present in the current code.
apps/sim/lib/uploads/core/storage-service.ts Adds createMultipartUpload streaming handle with bounded-concurrency parts and a single-shot fast path. The dispatchPart missing aborted guard (prior thread) and the new complete() missing aborted guard (this review) are both present; otherwise the logic is sound.
apps/sim/lib/table/export-runner.ts Refactored from single-shot uploadFile to streaming createMultipartUpload; ownership gates, abort-on-cancel, and delete-on-orphan all correctly updated.
apps/sim/lib/copilot/tools/handlers/function-execute.ts Adds snapshot-cache mount path behind feature flag; per-file and total-size guards, tenant-isolation checks, and flag-off fallback are all correctly implemented.
packages/db/migrations/0239_table_rows_version.sql Adds rows_version bigint DEFAULT 0 NOT NULL and three statement-level triggers mirroring the 0224 pattern; additive-only, expand-safe, and correctly uses transition tables to batch per-statement bumps.
packages/db/schema.ts Adds rowsVersion as bigint with mode: 'number' and default(0).notNull() — consistent with the migration and safe at any plausible mutation rate.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant FE as function-execute
    participant SC as snapshot-cache
    participant DB as Postgres
    participant ST as Object Storage

    FE->>DB: isFeatureEnabled('table-snapshot-cache')
    DB-->>FE: true

    FE->>SC: getOrCreateTableSnapshot(table, requestId)
    SC->>DB: SELECT rows_version FROM user_table_definitions
    DB-->>SC: "version = 5"

    SC->>ST: headObject(table-snapshots/ws/tbl/v5.csv)
    alt Cache HIT
        ST-->>SC: "{ size: 42000 }"
        SC-->>FE: "{ key: v5.csv, size: 42000, version: 5 }"
    else Cache MISS
        ST-->>SC: null
        SC->>ST: createMultipartUpload(key)
        loop page rows (keyset-paginated)
            SC->>DB: selectExportRowPage(after, 5000)
            DB-->>SC: rows[]
            SC->>ST: handle.write(csvChunk)
        end
        SC->>ST: handle.complete()
        ST-->>SC: "{ key, size }"
        SC->>DB: SELECT rows_version (recheck)
        alt version unchanged
            DB-->>SC: "version = 5"
            SC->>ST: deleteFile(v4.csv)
            SC-->>FE: "{ key: v5.csv, size, version: 5 }"
        else version advanced mid-scan
            DB-->>SC: "version = 6"
            SC->>ST: materialize v6.csv
            SC->>ST: deleteFile(v5.csv)
            SC-->>FE: "{ key: v6.csv, size, version: 6 }"
        end
    end

    FE->>ST: "downloadFile(key, maxBytes=10MB)"
    ST-->>FE: Buffer
    FE->>FE: push sandbox file
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant FE as function-execute
    participant SC as snapshot-cache
    participant DB as Postgres
    participant ST as Object Storage

    FE->>DB: isFeatureEnabled('table-snapshot-cache')
    DB-->>FE: true

    FE->>SC: getOrCreateTableSnapshot(table, requestId)
    SC->>DB: SELECT rows_version FROM user_table_definitions
    DB-->>SC: "version = 5"

    SC->>ST: headObject(table-snapshots/ws/tbl/v5.csv)
    alt Cache HIT
        ST-->>SC: "{ size: 42000 }"
        SC-->>FE: "{ key: v5.csv, size: 42000, version: 5 }"
    else Cache MISS
        ST-->>SC: null
        SC->>ST: createMultipartUpload(key)
        loop page rows (keyset-paginated)
            SC->>DB: selectExportRowPage(after, 5000)
            DB-->>SC: rows[]
            SC->>ST: handle.write(csvChunk)
        end
        SC->>ST: handle.complete()
        ST-->>SC: "{ key, size }"
        SC->>DB: SELECT rows_version (recheck)
        alt version unchanged
            DB-->>SC: "version = 5"
            SC->>ST: deleteFile(v4.csv)
            SC-->>FE: "{ key: v5.csv, size, version: 5 }"
        else version advanced mid-scan
            DB-->>SC: "version = 6"
            SC->>ST: materialize v6.csv
            SC->>ST: deleteFile(v5.csv)
            SC-->>FE: "{ key: v6.csv, size, version: 6 }"
        end
    end

    FE->>ST: "downloadFile(key, maxBytes=10MB)"
    ST-->>FE: Buffer
    FE->>FE: push sandbox file
Loading

Reviews (2): Last reviewed commit: "improvement(tables): versioned CSV snaps..." | Re-trigger Greptile

Comment on lines +157 to +171
if (after !== version) {
// The table mutated mid-scan: the bytes under `key` may be torn. Re-key to the new version and
// rebuild once (or reuse if a concurrent writer already stored it); drop the stale object.
logger.info(`[${requestId}] rows_version advanced during materialize; re-keying`, {
tableId: table.id,
from: version,
to: after,
})
const newKey = snapshotKey(table.workspaceId, table.id, after)
const newHead = await headObject(newKey, SNAPSHOT_STORAGE_CONTEXT)
const newSize = newHead ? newHead.size : await materialize(table, newKey)
await deleteFile({ key, context: SNAPSHOT_STORAGE_CONTEXT }).catch(() => {})
void deletePreviousVersion(table, after)
return { key: newKey, size: newSize, version: after }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Torn snapshot not cleaned when second materialize throws

When rows_version advances mid-scan (rare but possible), the torn snapshot is written to key (e.g., v3.csv) and then the code attempts a second materialize(table, newKey). If that second call throws — for example because a further bulk insert pushes the table past SNAPSHOT_MAX_BYTES — execution jumps out of the block before reaching deleteFile({ key, … }). The torn v3.csv is then permanently orphaned in object storage (serving stale/incorrect data until an S3 lifecycle rule reaps it).

The deleteFile for key should run before (or in a finally around) the second materialize to guarantee cleanup regardless of outcome.

Comment on lines +169 to +170
void deletePreviousVersion(table, after)
return { key: newKey, size: newSize, version: after }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Re-key path deletes the wrong prior version

deletePreviousVersion(table, after) computes snapshotKey(…, after - 1) — which is the torn v3.csv that was already explicitly deleted two lines above. The actual previous clean snapshot (v(version - 1), e.g., v2.csv) is silently leaked.

Consider calling deletePreviousVersion(table, version) here instead of (or in addition to) deletePreviousVersion(table, after), so the cleanup targets the last known-good snapshot that is now superseded by the newly stored v{after}.

Note: the first deleteFile({ key, … }) already handles the torn v3.csv, so the deletePreviousVersion(table, after) call is redundant as written.

Comment on lines +293 to +308
const dispatchPart = async (body: Buffer): Promise<void> => {
// Bound concurrency: wait for a free slot before starting another part.
while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight)
if (firstError) throw firstError
const be = await ensureBackend()
const partNo = ++partNumber
const p = be
.uploadPart(partNo, body)
.catch((err) => {
firstError ??= err
})
.finally(() => {
inflight.delete(p)
})
inflight.add(p)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 dispatchPart does not check aborted before dispatching

abort() sets aborted = true and then awaits all current in-flight parts before calling backend.abort(). If dispatchPart is concurrently suspended in the while (inflight.size >= MULTIPART_MAX_INFLIGHT) await Promise.race(inflight) loop, it will resume after abort() returns, skip the firstError check (no error was set), and dispatch a new UploadPart on an already-aborted multipart upload. S3 would return a NoSuchUpload error, which would eventually surface as firstError, but the extra request and its error path could be confusing. Adding if (aborted) return (or throw) after the while loop mirrors the guard in write().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant