Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import { isValidDuration } from "./services/realtime/duration.server";
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
function durationString() {
return z
.string()
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
return z.string().refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
}

// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
// and its _REQUIRED variant. Adds zod issues for empty input or unknown names.
const parseMachinePresetCsv = (
raw: string,
ctx: z.RefinementCtx
): MachinePresetName[] => {
const parseMachinePresetCsv = (raw: string, ctx: z.RefinementCtx): MachinePresetName[] => {
const names = raw
.split(",")
.map((s) => s.trim())
Expand Down Expand Up @@ -496,10 +491,7 @@ const EnvironmentSchema = z
.string()
.optional()
.transform((v, ctx) =>
parseMachinePresetCsv(
v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x",
ctx
)
parseMachinePresetCsv(v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", ctx)
),

DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"),
Expand Down Expand Up @@ -671,6 +663,7 @@ const EnvironmentSchema = z
ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

LOOPS_API_KEY: z.string().optional(),
ATTIO_API_KEY: z.string().optional(),
MARQS_DISABLE_REBALANCING: BoolEnv.default(false),
MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down Expand Up @@ -1154,7 +1147,9 @@ const EnvironmentSchema = z
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
.string()
Expand All @@ -1164,7 +1159,7 @@ const EnvironmentSchema = z
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
),
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
.string()
Expand All @@ -1174,7 +1169,9 @@ const EnvironmentSchema = z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
Expand Down Expand Up @@ -1238,11 +1235,7 @@ const EnvironmentSchema = z
// (retrieve, trace) have a safety net while PG replica lag settles.
TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30),
// ioredis per-request retry limit on the buffer's Redis client.
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce
.number()
.int()
.positive()
.default(20),
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce.number().int().positive().default(20),
// ioredis reconnect backoff envelope for the buffer client: the base
// grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered.
TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50),
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/models/organization.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { env } from "~/env.server";
import { featuresForUrl } from "~/features.server";
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
import { enqueueAttioWorkspaceSync } from "~/services/attio.server";
export type { Organization };

const nanoid = customAlphabet("1234567890abcdef", 4);
Expand Down Expand Up @@ -82,6 +83,16 @@ export async function createOrganization(
},
});

// Fire-and-forget; never blocks org creation.
void enqueueAttioWorkspaceSync({
orgId: organization.id,
title: organization.title,
slug: organization.slug,
companySize: organization.companySize,
createdAt: organization.createdAt,
adminUserId: userId,
});

return { ...organization };
}

Expand Down
132 changes: 132 additions & 0 deletions apps/webapp/app/services/attio.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { z } from "zod";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "./logger.server";

// Syncs new orgs/users into Attio (workspaces/users objects) at signup, via the
// common worker so a slow Attio never blocks signup. Ongoing field updates are
// handled by the scheduled sync, not here. No-op without ATTIO_API_KEY.

const ATTIO_API = "https://api.attio.com/v2";
const IS_TEST = env.APP_ENV !== "production";

export const AttioWorkspaceSyncSchema = z.object({
orgId: z.string(),
title: z.string(),
slug: z.string(),
companySize: z.string().nullish(),
createdAt: z.coerce.date(),
adminUserId: z.string(),
});
export type AttioWorkspaceSync = z.infer<typeof AttioWorkspaceSyncSchema>;

export const AttioUserSyncSchema = z.object({
userId: z.string(),
email: z.string(),
referralSource: z.string().nullish(),
marketingEmails: z.boolean(),
createdAt: z.coerce.date(),
});
export type AttioUserSync = z.infer<typeof AttioUserSyncSchema>;

class AttioClient {
constructor(private readonly apiKey: string) {}

// Create-or-update by unique attribute; returns the record id. Throws on failure so the worker retries.
async #assert(object: string, matchingAttribute: string, values: Record<string, unknown>): Promise<string> {
const url = `${ATTIO_API}/objects/${object}/records?matching_attribute=${matchingAttribute}`;
const response = await fetch(url, {
method: "PUT",
headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" },
body: JSON.stringify({ data: { values } }),
});

if (!response.ok) {
const body = await response.text();
logger.error("Attio assert failed", { object, matchingAttribute, status: response.status, body });
throw new Error(`Attio assert ${object} failed with status ${response.status}`);
}

return ((await response.json()) as any).data?.id?.record_id as string;
}

async upsertWorkspace(payload: AttioWorkspaceSync, emailDomain?: string) {
// The creating user is an admin of the new org — set their role and link them to the workspace.
const adminRecordId = await this.#assert("users", "user_id", {
user_id: payload.adminUserId,
role: "Admin",
is_test: IS_TEST,
});

await this.#assert("workspaces", "workspace_id", {
workspace_id: payload.orgId,
name: payload.title,
org_slug: payload.slug,
company_size: payload.companySize ?? undefined,
email_domain: emailDomain,
signup_date: toDate(payload.createdAt),
plan: "Free",
account_status: "Active",
is_test: IS_TEST,
users: [{ target_object: "users", target_record_id: adminRecordId }],
});
}

async upsertUser(payload: AttioUserSync) {
await this.#assert("users", "user_id", {
user_id: payload.userId,
primary_email_address: payload.email,
marketing_opt_in: payload.marketingEmails,
referral_source: payload.referralSource ?? undefined,
signup_date: toDate(payload.createdAt),
is_test: IS_TEST,
});
}
}

// Attio `date` attributes want a bare YYYY-MM-DD value.
function toDate(date: Date): string {
return date.toISOString().slice(0, 10);
}

// Domain from an email; the cloud-side matcher normalizes it further.
function domainFromEmail(email: string | undefined): string | undefined {
return email?.split("@")[1]?.toLowerCase().trim() || undefined;
}

export const attioClient = env.ATTIO_API_KEY ? new AttioClient(env.ATTIO_API_KEY) : null;

export async function enqueueAttioWorkspaceSync(payload: AttioWorkspaceSync) {
if (!attioClient) return;
try {
// Lazy import to avoid a circular dependency with commonWorker (which imports this module's schemas).
const { commonWorker } = await import("~/v3/commonWorker.server");
await commonWorker.enqueue({ id: `attio:workspace:${payload.orgId}`, job: "attio.syncWorkspace", payload });
} catch (error) {
logger.error("Failed to enqueue Attio workspace sync", { orgId: payload.orgId, error });
}
}

export async function enqueueAttioUserSync(payload: AttioUserSync) {
if (!attioClient) return;
try {
const { commonWorker } = await import("~/v3/commonWorker.server");
await commonWorker.enqueue({ id: `attio:user:${payload.userId}`, job: "attio.syncUser", payload });
} catch (error) {
logger.error("Failed to enqueue Attio user sync", { userId: payload.userId, error });
}
}

export async function runAttioWorkspaceSync(payload: AttioWorkspaceSync) {
if (!attioClient) return;
const admin = await prisma.user.findUnique({
where: { id: payload.adminUserId },
select: { email: true },
});
Comment on lines +122 to +125

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Uses findUnique instead of findFirst, violating mandatory repository rule

The runAttioWorkspaceSync function uses prisma.user.findUnique at apps/webapp/app/services/attio.server.ts:122, which violates the mandatory rule in apps/webapp/CLAUDE.md: "Always use findFirst instead of findUnique. Prisma's findUnique has an implicit DataLoader that batches concurrent calls into a single IN query. This batching cannot be disabled and has active bugs even in Prisma 6.x: uppercase UUIDs returning null (#25484), composite key SQL correctness issues (#22202), and 5-10x worse performance than manual DataLoader (#6573)." Since this runs inside the common worker where concurrent job executions can trigger the DataLoader batching, it's susceptible to the known Prisma bugs.

Suggested change
const admin = await prisma.user.findUnique({
where: { id: payload.adminUserId },
select: { email: true },
});
const admin = await prisma.user.findFirst({
where: { id: payload.adminUserId },
select: { email: true },
});
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

await attioClient.upsertWorkspace(payload, domainFromEmail(admin?.email));
}

export async function runAttioUserSync(payload: AttioUserSync) {
if (!attioClient) return;
await attioClient.upsertUser(payload);
}
9 changes: 9 additions & 0 deletions apps/webapp/app/services/telemetry.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Organization } from "~/models/organization.server";
import type { Project } from "~/models/project.server";
import type { User } from "~/models/user.server";
import { singleton } from "~/utils/singleton";
import { enqueueAttioUserSync } from "./attio.server";
import { loopsClient } from "./loops.server";

type Options = {
Expand Down Expand Up @@ -74,6 +75,14 @@ class Telemetry {
email: user.email,
name: user.name,
});

enqueueAttioUserSync({
userId: user.id,
email: user.email,
referralSource: referralSource ?? user.referralSource,
marketingEmails: user.marketingEmails,
createdAt: user.createdAt,
});
}
},
};
Expand Down
26 changes: 26 additions & 0 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { z } from "zod";
import { env } from "~/env.server";
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
import { sendEmail } from "~/services/email.server";
import {
AttioUserSyncSchema,
AttioWorkspaceSyncSchema,
runAttioUserSync,
runAttioWorkspaceSync,
} from "~/services/attio.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
Expand Down Expand Up @@ -46,6 +52,20 @@ function initializeWorker() {
maxAttempts: 3,
},
},
"attio.syncWorkspace": {
schema: AttioWorkspaceSyncSchema,
visibilityTimeoutMs: 30_000,
retry: {
maxAttempts: 3,
},
},
"attio.syncUser": {
schema: AttioUserSyncSchema,
visibilityTimeoutMs: 30_000,
retry: {
maxAttempts: 3,
},
},
"v3.resumeBatchRun": {
schema: z.object({
batchRunId: z.string(),
Expand Down Expand Up @@ -213,6 +233,12 @@ function initializeWorker() {
scheduleEmail: async ({ payload }) => {
await sendEmail(payload);
},
"attio.syncWorkspace": async ({ payload }) => {
await runAttioWorkspaceSync(payload);
},
"attio.syncUser": async ({ payload }) => {
await runAttioUserSync(payload);
},
"v3.resumeBatchRun": async ({ payload }) => {
const service = new ResumeBatchRunService();
await service.call(payload.batchRunId);
Expand Down