Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/datajoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# Other
"errors",
"migrate",
"deploy",
"DataJointError",
"ThreadSafetyError",
"logger",
Expand All @@ -69,6 +70,7 @@
# =============================================================================
from . import errors
from . import migrate
from . import deploy
from .codecs import (
Codec,
get_codec,
Expand Down
25 changes: 25 additions & 0 deletions src/datajoint/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,31 @@ def enum_type_ddl(self, type_name: str, values: list[str]) -> str | None:
quoted_values = ", ".join(f"'{v}'" for v in values)
return f"CREATE TYPE {self.quote_identifier(type_name)} AS ENUM ({quoted_values})"

def replica_identity_ddl(self, full_table_name: str, mode: str) -> str:
"""
Generate ALTER TABLE ... REPLICA IDENTITY statement.

Controls how much of the old row PostgreSQL writes to WAL on UPDATE/DELETE.
``"default"`` logs only primary-key columns; ``"full"`` logs the entire row.
Required by some CDC tools (e.g. Databricks Lakehouse Sync) that need the
full pre-image to drive Slowly-Changing-Dimension history.

The ALTER is metadata-only, instant, and idempotent — re-applying the same
mode is a no-op at the storage layer.

Examples
--------
>>> adapter.replica_identity_ddl('"schema"."table"', 'full')
'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL'
>>> adapter.replica_identity_ddl('"schema"."table"', 'default')
'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT'
"""
if mode not in ("default", "full"):
from ..errors import DataJointError

raise DataJointError(f"Unsupported replica_identity mode: {mode!r}. Expected 'default' or 'full'.")
return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}"

def get_pending_enum_ddl(self, schema_name: str) -> list[str]:
"""
Get DDL statements for pending enum types and clear the pending list.
Expand Down
185 changes: 185 additions & 0 deletions src/datajoint/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
Deployment-time operations for configuring an existing DataJoint pipeline.

This module hosts idempotent operational helpers — things you run as part of a
deploy hook to configure a schema for its environment, distinct from
:mod:`datajoint.migrate` which handles one-shot schema/state evolution.

The boundary between the two:

- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition,
retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``,
``add_job_metadata_columns``, ``rebuild_lineage``.
- :mod:`datajoint.deploy` — configure an environment for a consumer's
requirements (CDC tools, replication, role grants, performance tuning).
Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`.

Functions in this module should be safe to call repeatedly from a deploy hook
without accumulating side effects.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Literal, Union

from .errors import DataJointError

if TYPE_CHECKING:
from .schemas import _Schema
from .table import Table

TargetType = Union["_Schema", type["Table"], "Table"]


def set_replica_identity(
target: "TargetType",
mode: Literal["default", "full"] = "full",
dry_run: bool = True,
) -> dict:
"""
Apply ``ALTER TABLE ... REPLICA IDENTITY <mode>`` to a schema or table on PostgreSQL.

``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to
the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key
columns appear in WAL; under ``FULL``, the entire old row does.

Why this exists
---------------
Some change-data-capture (CDC) consumers require the full row pre-image to
drive their downstream models. The canonical example is **Databricks
Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently
skipped by the sync — no error, just missing data downstream. Other CDC
tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with
``DEFAULT`` when tables have a primary key; only Databricks mandates
``FULL``.

This helper is the **operational** way to apply the setting. It is not a
migration: there's no legacy state being fixed; the setting is simply a
property of the deployment environment, and a fresh declare in a new
environment may need it re-applied. It is idempotent — re-applying the
same mode is a no-op at the storage layer — so it is safe to call from a
deploy hook on every release.

Cost
----
The ALTER itself is metadata-only and instant, but requires a brief
``AccessExclusiveLock`` on each table — it will block behind in-flight
writes/reads on a busy table. Run during a quiet window on actively-
ingested tables.

The ongoing cost is in WAL volume after the change: UPDATE/DELETE on
tables with FULL log the entire old row, which can be sizable on tables
with TOASTed bytea columns. For DataJoint's typical insert-append
workload, this cost is negligible. The notable scenario is bulk
``delete()`` on tables with ``<blob>`` columns — a transient WAL burst
proportional to the deleted-row payload size.

Partial-failure semantics
-------------------------
If ``connection.query(ddl)`` raises on table N of M, the first N-1
tables are already modified at the storage layer but the exception
propagates without returning the partial summary. The operation is
idempotent, so re-running brings the remaining tables into compliance.

Compliance considerations
-------------------------
Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``,
entire rows do — including any PHI/PII/sensitive columns. For self-hosted
PostgreSQL with unrestricted WAL access this is a real consideration; for
managed PostgreSQL with logical replication confined to a specific
subscriber (Lakebase, RDS), WAL stays inside the managed environment's
security boundary. Apply intentionally.

Parameters
----------
target : Schema or Table
A :class:`datajoint.Schema` (all user tables) or a
:class:`datajoint.Table` class/instance (just that table).
mode : str, default ``"full"``
``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row).
dry_run : bool, default ``True``
If True, collect the DDL statements but do not execute. Set to False
to actually apply.

Returns
-------
dict
- ``tables_analyzed`` (int): number of tables considered.
- ``tables_modified`` (int): number of tables on which the ALTER ran.
Always 0 when ``dry_run=True``.
- ``ddl`` (list[str]): the DDL statements that were (or would be) executed.

Raises
------
DataJointError
If the target's backend is not PostgreSQL, or if ``mode`` is not one of
``"default"`` / ``"full"``.

Examples
--------
>>> from datajoint.deploy import set_replica_identity
>>> # Preview
>>> set_replica_identity(my_schema, mode="full", dry_run=True)
{'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]}
>>> # Apply
>>> set_replica_identity(my_schema, mode="full", dry_run=False)
{'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]}
>>> # Single table
>>> set_replica_identity(MyTable, mode="full", dry_run=False)

See Also
--------
PostgreSQL: `Logical Replication — Replica Identity
<https://www.postgresql.org/docs/current/logical-replication-publication.html>`_.
Databricks: `Lakehouse Sync
<https://docs.databricks.com/aws/en/oltp/projects/lakehouse-sync>`_.
"""
mode_normalized = mode.lower() if isinstance(mode, str) else mode
if mode_normalized not in ("default", "full"):
raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}")
mode = mode_normalized # type: ignore[assignment]

from .schemas import _Schema
from .table import Table

if isinstance(target, _Schema):
connection = target.connection
if connection is None:
raise DataJointError("Schema has no active connection.")
adapter = connection.adapter
if target.database is None:
raise DataJointError("Schema is not activated. Call schema.activate(...) before set_replica_identity().")
tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()]
elif isinstance(target, type) and issubclass(target, Table):
instance = target()
connection = instance.connection
if connection is None:
raise DataJointError(f"Table {target.__name__} has no active connection.")
adapter = connection.adapter
tables = [instance.full_table_name]
elif isinstance(target, Table):
connection = target.connection
if connection is None:
raise DataJointError(f"Table {type(target).__name__} has no active connection.")
adapter = connection.adapter
tables = [target.full_table_name]
else:
raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}")

if not hasattr(adapter, "replica_identity_ddl"):
raise DataJointError(
f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter does not support REPLICA IDENTITY."
)

result: dict[str, Any] = {
"tables_analyzed": len(tables),
"tables_modified": 0,
"ddl": [],
}
for full_name in tables:
ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined]
result["ddl"].append(ddl)
if not dry_run:
connection.query(ddl)
result["tables_modified"] += 1
return result
17 changes: 17 additions & 0 deletions tests/unit/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,23 @@ def test_enum_type_ddl_postgres(self, postgres_adapter):
result = postgres_adapter.enum_type_ddl("status_type", ["active", "inactive"])
assert result == "CREATE TYPE \"status_type\" AS ENUM ('active', 'inactive')"

def test_replica_identity_ddl_full(self, postgres_adapter):
"""Test PostgreSQL replica identity DDL for 'full' mode."""
result = postgres_adapter.replica_identity_ddl('"schema"."table"', "full")
assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL'

def test_replica_identity_ddl_default(self, postgres_adapter):
"""Test PostgreSQL replica identity DDL for 'default' mode."""
result = postgres_adapter.replica_identity_ddl('"schema"."table"', "default")
assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT'

def test_replica_identity_ddl_invalid_mode(self, postgres_adapter):
"""Invalid mode raises DataJointError."""
from datajoint.errors import DataJointError

with pytest.raises(DataJointError, match="Unsupported replica_identity mode"):
postgres_adapter.replica_identity_ddl('"schema"."table"', "nothing")

def test_job_metadata_columns_postgres(self, postgres_adapter):
"""Test PostgreSQL job metadata columns."""
result = postgres_adapter.job_metadata_columns()
Expand Down
Loading
Loading