diff --git a/src/datajoint/__init__.py b/src/datajoint/__init__.py index 4970b19d4..552e89a4c 100644 --- a/src/datajoint/__init__.py +++ b/src/datajoint/__init__.py @@ -57,6 +57,7 @@ # Other "errors", "migrate", + "deploy", "DataJointError", "ThreadSafetyError", "logger", @@ -69,6 +70,7 @@ # ============================================================================= from . import errors from . import migrate +from . import deploy from .codecs import ( Codec, get_codec, diff --git a/src/datajoint/adapters/postgres.py b/src/datajoint/adapters/postgres.py index 543e972d3..55ea189dc 100644 --- a/src/datajoint/adapters/postgres.py +++ b/src/datajoint/adapters/postgres.py @@ -1266,6 +1266,31 @@ def enum_type_ddl(self, type_name: str, values: list[str]) -> str | None: quoted_values = ", ".join(f"'{v}'" for v in values) return f"CREATE TYPE {self.quote_identifier(type_name)} AS ENUM ({quoted_values})" + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + """ + Generate ALTER TABLE ... REPLICA IDENTITY statement. + + Controls how much of the old row PostgreSQL writes to WAL on UPDATE/DELETE. + ``"default"`` logs only primary-key columns; ``"full"`` logs the entire row. + Required by some CDC tools (e.g. Databricks Lakehouse Sync) that need the + full pre-image to drive Slowly-Changing-Dimension history. + + The ALTER is metadata-only, instant, and idempotent — re-applying the same + mode is a no-op at the storage layer. + + Examples + -------- + >>> adapter.replica_identity_ddl('"schema"."table"', 'full') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + >>> adapter.replica_identity_ddl('"schema"."table"', 'default') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + """ + if mode not in ("default", "full"): + from ..errors import DataJointError + + raise DataJointError(f"Unsupported replica_identity mode: {mode!r}. Expected 'default' or 'full'.") + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + def get_pending_enum_ddl(self, schema_name: str) -> list[str]: """ Get DDL statements for pending enum types and clear the pending list. diff --git a/src/datajoint/deploy.py b/src/datajoint/deploy.py new file mode 100644 index 000000000..73c4ab30b --- /dev/null +++ b/src/datajoint/deploy.py @@ -0,0 +1,185 @@ +""" +Deployment-time operations for configuring an existing DataJoint pipeline. + +This module hosts idempotent operational helpers — things you run as part of a +deploy hook to configure a schema for its environment, distinct from +:mod:`datajoint.migrate` which handles one-shot schema/state evolution. + +The boundary between the two: + +- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition, + retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``, + ``add_job_metadata_columns``, ``rebuild_lineage``. +- :mod:`datajoint.deploy` — configure an environment for a consumer's + requirements (CDC tools, replication, role grants, performance tuning). + Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`. + +Functions in this module should be safe to call repeatedly from a deploy hook +without accumulating side effects. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Literal, Union + +from .errors import DataJointError + +if TYPE_CHECKING: + from .schemas import _Schema + from .table import Table + + TargetType = Union["_Schema", type["Table"], "Table"] + + +def set_replica_identity( + target: "TargetType", + mode: Literal["default", "full"] = "full", + dry_run: bool = True, +) -> dict: + """ + Apply ``ALTER TABLE ... REPLICA IDENTITY `` to a schema or table on PostgreSQL. + + ``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to + the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key + columns appear in WAL; under ``FULL``, the entire old row does. + + Why this exists + --------------- + Some change-data-capture (CDC) consumers require the full row pre-image to + drive their downstream models. The canonical example is **Databricks + Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently + skipped by the sync — no error, just missing data downstream. Other CDC + tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with + ``DEFAULT`` when tables have a primary key; only Databricks mandates + ``FULL``. + + This helper is the **operational** way to apply the setting. It is not a + migration: there's no legacy state being fixed; the setting is simply a + property of the deployment environment, and a fresh declare in a new + environment may need it re-applied. It is idempotent — re-applying the + same mode is a no-op at the storage layer — so it is safe to call from a + deploy hook on every release. + + Cost + ---- + The ALTER itself is metadata-only and instant, but requires a brief + ``AccessExclusiveLock`` on each table — it will block behind in-flight + writes/reads on a busy table. Run during a quiet window on actively- + ingested tables. + + The ongoing cost is in WAL volume after the change: UPDATE/DELETE on + tables with FULL log the entire old row, which can be sizable on tables + with TOASTed bytea columns. For DataJoint's typical insert-append + workload, this cost is negligible. The notable scenario is bulk + ``delete()`` on tables with ```` columns — a transient WAL burst + proportional to the deleted-row payload size. + + Partial-failure semantics + ------------------------- + If ``connection.query(ddl)`` raises on table N of M, the first N-1 + tables are already modified at the storage layer but the exception + propagates without returning the partial summary. The operation is + idempotent, so re-running brings the remaining tables into compliance. + + Compliance considerations + ------------------------- + Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``, + entire rows do — including any PHI/PII/sensitive columns. For self-hosted + PostgreSQL with unrestricted WAL access this is a real consideration; for + managed PostgreSQL with logical replication confined to a specific + subscriber (Lakebase, RDS), WAL stays inside the managed environment's + security boundary. Apply intentionally. + + Parameters + ---------- + target : Schema or Table + A :class:`datajoint.Schema` (all user tables) or a + :class:`datajoint.Table` class/instance (just that table). + mode : str, default ``"full"`` + ``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row). + dry_run : bool, default ``True`` + If True, collect the DDL statements but do not execute. Set to False + to actually apply. + + Returns + ------- + dict + - ``tables_analyzed`` (int): number of tables considered. + - ``tables_modified`` (int): number of tables on which the ALTER ran. + Always 0 when ``dry_run=True``. + - ``ddl`` (list[str]): the DDL statements that were (or would be) executed. + + Raises + ------ + DataJointError + If the target's backend is not PostgreSQL, or if ``mode`` is not one of + ``"default"`` / ``"full"``. + + Examples + -------- + >>> from datajoint.deploy import set_replica_identity + >>> # Preview + >>> set_replica_identity(my_schema, mode="full", dry_run=True) + {'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]} + >>> # Apply + >>> set_replica_identity(my_schema, mode="full", dry_run=False) + {'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]} + >>> # Single table + >>> set_replica_identity(MyTable, mode="full", dry_run=False) + + See Also + -------- + PostgreSQL: `Logical Replication — Replica Identity + `_. + Databricks: `Lakehouse Sync + `_. + """ + mode_normalized = mode.lower() if isinstance(mode, str) else mode + if mode_normalized not in ("default", "full"): + raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}") + mode = mode_normalized # type: ignore[assignment] + + from .schemas import _Schema + from .table import Table + + if isinstance(target, _Schema): + connection = target.connection + if connection is None: + raise DataJointError("Schema has no active connection.") + adapter = connection.adapter + if target.database is None: + raise DataJointError("Schema is not activated. Call schema.activate(...) before set_replica_identity().") + tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()] + elif isinstance(target, type) and issubclass(target, Table): + instance = target() + connection = instance.connection + if connection is None: + raise DataJointError(f"Table {target.__name__} has no active connection.") + adapter = connection.adapter + tables = [instance.full_table_name] + elif isinstance(target, Table): + connection = target.connection + if connection is None: + raise DataJointError(f"Table {type(target).__name__} has no active connection.") + adapter = connection.adapter + tables = [target.full_table_name] + else: + raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}") + + if not hasattr(adapter, "replica_identity_ddl"): + raise DataJointError( + f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter does not support REPLICA IDENTITY." + ) + + result: dict[str, Any] = { + "tables_analyzed": len(tables), + "tables_modified": 0, + "ddl": [], + } + for full_name in tables: + ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined] + result["ddl"].append(ddl) + if not dry_run: + connection.query(ddl) + result["tables_modified"] += 1 + return result diff --git a/tests/unit/test_adapters.py b/tests/unit/test_adapters.py index edbff9d52..5b7e6a96e 100644 --- a/tests/unit/test_adapters.py +++ b/tests/unit/test_adapters.py @@ -532,6 +532,23 @@ def test_enum_type_ddl_postgres(self, postgres_adapter): result = postgres_adapter.enum_type_ddl("status_type", ["active", "inactive"]) assert result == "CREATE TYPE \"status_type\" AS ENUM ('active', 'inactive')" + def test_replica_identity_ddl_full(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'full' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "full") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + + def test_replica_identity_ddl_default(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'default' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "default") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + + def test_replica_identity_ddl_invalid_mode(self, postgres_adapter): + """Invalid mode raises DataJointError.""" + from datajoint.errors import DataJointError + + with pytest.raises(DataJointError, match="Unsupported replica_identity mode"): + postgres_adapter.replica_identity_ddl('"schema"."table"', "nothing") + def test_job_metadata_columns_postgres(self, postgres_adapter): """Test PostgreSQL job metadata columns.""" result = postgres_adapter.job_metadata_columns() diff --git a/tests/unit/test_deploy.py b/tests/unit/test_deploy.py new file mode 100644 index 000000000..3cca663a7 --- /dev/null +++ b/tests/unit/test_deploy.py @@ -0,0 +1,172 @@ +""" +Unit tests for :mod:`datajoint.deploy`. + +These tests do not require a live PostgreSQL connection — they cover dispatch, +validation, and DDL string generation against the actual ``PostgreSQLAdapter`` +and a stub adapter for the non-PG path. +""" + +from __future__ import annotations + +import pytest + +import datajoint as dj +from datajoint.deploy import set_replica_identity +from datajoint.errors import DataJointError + + +class _FakeAdapter: + """Bare-minimum adapter stub for testing dispatch (PostgreSQL-shaped).""" + + backend = "postgresql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f'"{schema}"."{table}"' + + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + + +class _MySQLLikeAdapter: + """Adapter without ``replica_identity_ddl`` (MySQL-shaped).""" + + backend = "mysql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f"`{schema}`.`{table}`" + + +class _FakeConnection: + """Connection stub that records queries instead of executing them.""" + + def __init__(self, adapter: object) -> None: + self.adapter = adapter + self.queries: list[str] = [] + + def query(self, sql: str) -> None: + self.queries.append(sql) + + +class _FakeSchema(dj.schemas._Schema): + """Schema stub bypassing __init__ wiring; sets just what set_replica_identity uses.""" + + def __init__(self, database: str, table_names: list[str], adapter: object) -> None: + # Skip dj.Schema.__init__ — fabricate the minimal attributes. + self.database = database + self._tables = table_names + self.connection = _FakeConnection(adapter) + + def list_tables(self) -> list[str]: + return self._tables + + +def test_set_replica_identity_rejects_invalid_mode(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + with pytest.raises(DataJointError, match="mode must be 'default' or 'full'"): + set_replica_identity(schema, mode="nothing") + + +def test_set_replica_identity_rejects_bad_target(): + with pytest.raises(DataJointError, match="must be a Schema or Table"): + set_replica_identity("not a schema", mode="full") + + +def test_set_replica_identity_rejects_non_postgresql(): + schema = _FakeSchema("ms", ["t1", "t2"], _MySQLLikeAdapter()) + with pytest.raises(DataJointError, match="PostgreSQL-only"): + set_replica_identity(schema, mode="full") + + +def test_set_replica_identity_dry_run_no_execute(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=True) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 0 + assert result["ddl"] == [ + 'ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', + 'ALTER TABLE "ms"."t2" REPLICA IDENTITY FULL', + ] + assert schema.connection.queries == [] + + +def test_set_replica_identity_apply_runs_alters(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 2 + assert schema.connection.queries == result["ddl"] + + +def test_set_replica_identity_default_mode_emits_default_ddl(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + result = set_replica_identity(schema, mode="default", dry_run=True) + assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY DEFAULT'] + + +def test_set_replica_identity_empty_schema(): + schema = _FakeSchema("ms", [], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result == {"tables_analyzed": 0, "tables_modified": 0, "ddl": []} + + +class _FakeTable(dj.Table): + """Table stub bypassing schema-decoration wiring.""" + + # Suppress dj.Table's class-construction checks + table_name = "fake_table" + + def __init__(self, full_table_name: str, adapter: object) -> None: + # Skip dj.Table.__init__ — fabricate the minimal attributes. + self._full_table_name = full_table_name + self._connection = _FakeConnection(adapter) + + @property + def full_table_name(self) -> str: + return self._full_table_name + + @property + def connection(self): + return self._connection + + +def test_set_replica_identity_table_instance_target(): + """Table instance dispatch (deploy.py: isinstance(target, Table) branch).""" + table = _FakeTable('"ms"."the_table"', _FakeAdapter()) + result = set_replica_identity(table, mode="full", dry_run=False) + assert result == { + "tables_analyzed": 1, + "tables_modified": 1, + "ddl": ['ALTER TABLE "ms"."the_table" REPLICA IDENTITY FULL'], + } + assert table.connection.queries == result["ddl"] + + +def test_set_replica_identity_table_class_target(monkeypatch): + """Table-class dispatch (deploy.py: issubclass(target, Table) branch).""" + # Build a class that instantiates a _FakeTable when called like target() + fake_adapter = _FakeAdapter() + + class _TableClass(dj.Table): + def __new__(cls): + return _FakeTable('"ms"."class_table"', fake_adapter) + + # `isinstance(_TableClass, type) and issubclass(_TableClass, dj.Table)` is True. + result = set_replica_identity(_TableClass, mode="full", dry_run=True) + assert result["tables_analyzed"] == 1 + assert result["tables_modified"] == 0 # dry_run + assert result["ddl"] == ['ALTER TABLE "ms"."class_table" REPLICA IDENTITY FULL'] + + +def test_set_replica_identity_case_insensitive_mode(): + """`mode='FULL'` (uppercase) should be accepted, matching adapter case-handling.""" + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + result = set_replica_identity(schema, mode="FULL", dry_run=True) + assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL'] + + +def test_set_replica_identity_unactivated_schema_raises(): + """Schema with database=None (never activated) must raise, not produce malformed DDL.""" + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + schema.database = None + with pytest.raises(DataJointError, match="Schema is not activated"): + set_replica_identity(schema, mode="full", dry_run=True)