From 637adb5a21c3b28b037a32788cca21e114d5a15b Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Wed, 10 Jun 2026 16:24:51 -0500 Subject: [PATCH 1/2] feat(deploy): set_replica_identity for PostgreSQL CDC (#1447) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds dj.deploy.set_replica_identity(target, mode, dry_run) — an idempotent deployment-time operation that applies ALTER TABLE ... REPLICA IDENTITY DEFAULT|FULL across a Schema or single Table on PostgreSQL. Required by some CDC consumers (Databricks Lakehouse Sync mandates FULL; silently skips tables without it). The ALTER is metadata-only and instant; the cost is in WAL volume on subsequent UPDATE/DELETE. Why a new module rather than dj.migrate or auto-emit in declare(): - Not a migration. Nothing legacy is being fixed; this configures an existing schema for an environment-specific consumer requirement. - Not a declare-time concern. Mixing auto-emit-on-declare with a separate utility for existing tables produces mixed state (new tables FULL, old tables DEFAULT) until both run. Migration-only is one coherent path. - Future operational helpers (publication membership, vacuum/reindex, grants) belong with set_replica_identity, not in migrate.py. dj.deploy is the home for that category. Shape: - adapters/postgres.py — replica_identity_ddl(full_name, mode) emits the ALTER TABLE statement. Pure DDL emitter; "default" and "full" both produce explicit ALTERs. Invalid mode raises DataJointError. - deploy.py (new) — set_replica_identity dispatches Schema vs Table, routes through the PG adapter, raises on non-PG backends, supports dry_run, returns {tables_analyzed, tables_modified, ddl}. - __init__.py — exports dj.deploy. Tests: - Unit tests for the adapter DDL (full, default, invalid mode). - Unit tests for deploy.set_replica_identity (mode validation, target validation, non-PG rejection, dry_run, apply, default mode, empty schema). All use stub adapters so no live PG is required. Slated for DataJoint 2.3. --- src/datajoint/__init__.py | 2 + src/datajoint/adapters/postgres.py | 25 +++++ src/datajoint/deploy.py | 157 +++++++++++++++++++++++++++++ tests/unit/test_adapters.py | 17 ++++ tests/unit/test_deploy.py | 109 ++++++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 src/datajoint/deploy.py create mode 100644 tests/unit/test_deploy.py diff --git a/src/datajoint/__init__.py b/src/datajoint/__init__.py index 4970b19d4..552e89a4c 100644 --- a/src/datajoint/__init__.py +++ b/src/datajoint/__init__.py @@ -57,6 +57,7 @@ # Other "errors", "migrate", + "deploy", "DataJointError", "ThreadSafetyError", "logger", @@ -69,6 +70,7 @@ # ============================================================================= from . import errors from . import migrate +from . import deploy from .codecs import ( Codec, get_codec, diff --git a/src/datajoint/adapters/postgres.py b/src/datajoint/adapters/postgres.py index 543e972d3..55ea189dc 100644 --- a/src/datajoint/adapters/postgres.py +++ b/src/datajoint/adapters/postgres.py @@ -1266,6 +1266,31 @@ def enum_type_ddl(self, type_name: str, values: list[str]) -> str | None: quoted_values = ", ".join(f"'{v}'" for v in values) return f"CREATE TYPE {self.quote_identifier(type_name)} AS ENUM ({quoted_values})" + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + """ + Generate ALTER TABLE ... REPLICA IDENTITY statement. + + Controls how much of the old row PostgreSQL writes to WAL on UPDATE/DELETE. + ``"default"`` logs only primary-key columns; ``"full"`` logs the entire row. + Required by some CDC tools (e.g. Databricks Lakehouse Sync) that need the + full pre-image to drive Slowly-Changing-Dimension history. + + The ALTER is metadata-only, instant, and idempotent — re-applying the same + mode is a no-op at the storage layer. + + Examples + -------- + >>> adapter.replica_identity_ddl('"schema"."table"', 'full') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + >>> adapter.replica_identity_ddl('"schema"."table"', 'default') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + """ + if mode not in ("default", "full"): + from ..errors import DataJointError + + raise DataJointError(f"Unsupported replica_identity mode: {mode!r}. Expected 'default' or 'full'.") + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + def get_pending_enum_ddl(self, schema_name: str) -> list[str]: """ Get DDL statements for pending enum types and clear the pending list. diff --git a/src/datajoint/deploy.py b/src/datajoint/deploy.py new file mode 100644 index 000000000..14fd2992c --- /dev/null +++ b/src/datajoint/deploy.py @@ -0,0 +1,157 @@ +""" +Deployment-time operations for configuring an existing DataJoint pipeline. + +This module hosts idempotent operational helpers — things you run as part of a +deploy hook to configure a schema for its environment, distinct from +:mod:`datajoint.migrate` which handles one-shot schema/state evolution. + +The boundary between the two: + +- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition, + retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``, + ``add_job_metadata_columns``, ``rebuild_lineage``. +- :mod:`datajoint.deploy` — configure an environment for a consumer's + requirements (CDC tools, replication, role grants, performance tuning). + Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`. + +Functions in this module should be safe to call repeatedly from a deploy hook +without accumulating side effects. +""" + +from __future__ import annotations + +from typing import Any + +from .errors import DataJointError + + +def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) -> dict: + """ + Apply ``ALTER TABLE ... REPLICA IDENTITY `` to a schema or table on PostgreSQL. + + ``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to + the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key + columns appear in WAL; under ``FULL``, the entire old row does. + + Why this exists + --------------- + Some change-data-capture (CDC) consumers require the full row pre-image to + drive their downstream models. The canonical example is **Databricks + Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently + skipped by the sync — no error, just missing data downstream. Other CDC + tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with + ``DEFAULT`` when tables have a primary key; only Databricks mandates + ``FULL``. + + This helper is the **operational** way to apply the setting. It is not a + migration: there's no legacy state being fixed; the setting is simply a + property of the deployment environment, and a fresh declare in a new + environment may need it re-applied. It is idempotent — re-applying the + same mode is a no-op at the storage layer — so it is safe to call from a + deploy hook on every release. + + Cost + ---- + The ALTER itself is metadata-only and instant. The cost is in WAL volume + after the change: UPDATE/DELETE on tables with FULL log the entire old row, + which can be sizable on tables with TOASTed bytea columns. For DataJoint's + typical insert-append workload, this cost is negligible. The notable + scenario is bulk ``delete()`` on tables with ```` columns — a + transient WAL burst proportional to the deleted-row payload size. + + Compliance considerations + ------------------------- + Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``, + entire rows do — including any PHI/PII/sensitive columns. For self-hosted + PostgreSQL with unrestricted WAL access this is a real consideration; for + managed PostgreSQL with logical replication confined to a specific + subscriber (Lakebase, RDS), WAL stays inside the managed environment's + security boundary. Apply intentionally. + + Parameters + ---------- + target : Schema or Table + A :class:`datajoint.Schema` (all user tables) or a + :class:`datajoint.Table` class/instance (just that table). + mode : str, default ``"full"`` + ``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row). + dry_run : bool, default ``True`` + If True, collect the DDL statements but do not execute. Set to False + to actually apply. + + Returns + ------- + dict + - ``tables_analyzed`` (int): number of tables considered. + - ``tables_modified`` (int): number of tables on which the ALTER ran. + Always 0 when ``dry_run=True``. + - ``ddl`` (list[str]): the DDL statements that were (or would be) executed. + + Raises + ------ + DataJointError + If the target's backend is not PostgreSQL, or if ``mode`` is not one of + ``"default"`` / ``"full"``. + + Examples + -------- + >>> from datajoint.deploy import set_replica_identity + >>> # Preview + >>> set_replica_identity(my_schema, mode="full", dry_run=True) + {'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]} + >>> # Apply + >>> set_replica_identity(my_schema, mode="full", dry_run=False) + {'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]} + >>> # Single table + >>> set_replica_identity(MyTable, mode="full", dry_run=False) + + See Also + -------- + PostgreSQL: `Logical Replication — Replica Identity + `_. + Databricks: `Lakehouse Sync + `_. + """ + if mode not in ("default", "full"): + raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}") + + from .schemas import _Schema + from .table import Table + + if isinstance(target, _Schema): + connection = target.connection + assert connection is not None, "Schema has no active connection" + adapter = connection.adapter + assert target.database is not None, "Schema is not activated" + tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()] + elif isinstance(target, type) and issubclass(target, Table): + instance = target() + connection = instance.connection + assert connection is not None, "Table has no active connection" + adapter = connection.adapter + tables = [instance.full_table_name] + elif isinstance(target, Table): + connection = target.connection + assert connection is not None, "Table has no active connection" + adapter = connection.adapter + tables = [target.full_table_name] + else: + raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}") + + if not hasattr(adapter, "replica_identity_ddl"): + raise DataJointError( + f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter " "does not support REPLICA IDENTITY." + ) + + result: dict[str, Any] = { + "tables_analyzed": len(tables), + "tables_modified": 0, + "ddl": [], + } + for full_name in tables: + ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined] + result["ddl"].append(ddl) + if not dry_run: + connection.query(ddl) + result["tables_modified"] += 1 + return result diff --git a/tests/unit/test_adapters.py b/tests/unit/test_adapters.py index edbff9d52..5b7e6a96e 100644 --- a/tests/unit/test_adapters.py +++ b/tests/unit/test_adapters.py @@ -532,6 +532,23 @@ def test_enum_type_ddl_postgres(self, postgres_adapter): result = postgres_adapter.enum_type_ddl("status_type", ["active", "inactive"]) assert result == "CREATE TYPE \"status_type\" AS ENUM ('active', 'inactive')" + def test_replica_identity_ddl_full(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'full' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "full") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + + def test_replica_identity_ddl_default(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'default' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "default") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + + def test_replica_identity_ddl_invalid_mode(self, postgres_adapter): + """Invalid mode raises DataJointError.""" + from datajoint.errors import DataJointError + + with pytest.raises(DataJointError, match="Unsupported replica_identity mode"): + postgres_adapter.replica_identity_ddl('"schema"."table"', "nothing") + def test_job_metadata_columns_postgres(self, postgres_adapter): """Test PostgreSQL job metadata columns.""" result = postgres_adapter.job_metadata_columns() diff --git a/tests/unit/test_deploy.py b/tests/unit/test_deploy.py new file mode 100644 index 000000000..18c54519d --- /dev/null +++ b/tests/unit/test_deploy.py @@ -0,0 +1,109 @@ +""" +Unit tests for :mod:`datajoint.deploy`. + +These tests do not require a live PostgreSQL connection — they cover dispatch, +validation, and DDL string generation against the actual ``PostgreSQLAdapter`` +and a stub adapter for the non-PG path. +""" + +from __future__ import annotations + +import pytest + +import datajoint as dj +from datajoint.deploy import set_replica_identity +from datajoint.errors import DataJointError + + +class _FakeAdapter: + """Bare-minimum adapter stub for testing dispatch (PostgreSQL-shaped).""" + + backend = "postgresql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f'"{schema}"."{table}"' + + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + + +class _MySQLLikeAdapter: + """Adapter without ``replica_identity_ddl`` (MySQL-shaped).""" + + backend = "mysql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f"`{schema}`.`{table}`" + + +class _FakeConnection: + """Connection stub that records queries instead of executing them.""" + + def __init__(self, adapter: object) -> None: + self.adapter = adapter + self.queries: list[str] = [] + + def query(self, sql: str) -> None: + self.queries.append(sql) + + +class _FakeSchema(dj.schemas._Schema): + """Schema stub bypassing __init__ wiring; sets just what set_replica_identity uses.""" + + def __init__(self, database: str, table_names: list[str], adapter: object) -> None: + # Skip dj.Schema.__init__ — fabricate the minimal attributes. + self.database = database + self._tables = table_names + self.connection = _FakeConnection(adapter) + + def list_tables(self) -> list[str]: + return self._tables + + +def test_set_replica_identity_rejects_invalid_mode(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + with pytest.raises(DataJointError, match="mode must be 'default' or 'full'"): + set_replica_identity(schema, mode="nothing") + + +def test_set_replica_identity_rejects_bad_target(): + with pytest.raises(DataJointError, match="must be a Schema or Table"): + set_replica_identity("not a schema", mode="full") + + +def test_set_replica_identity_rejects_non_postgresql(): + schema = _FakeSchema("ms", ["t1", "t2"], _MySQLLikeAdapter()) + with pytest.raises(DataJointError, match="PostgreSQL-only"): + set_replica_identity(schema, mode="full") + + +def test_set_replica_identity_dry_run_no_execute(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=True) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 0 + assert result["ddl"] == [ + 'ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', + 'ALTER TABLE "ms"."t2" REPLICA IDENTITY FULL', + ] + assert schema.connection.queries == [] + + +def test_set_replica_identity_apply_runs_alters(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 2 + assert schema.connection.queries == result["ddl"] + + +def test_set_replica_identity_default_mode_emits_default_ddl(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + result = set_replica_identity(schema, mode="default", dry_run=True) + assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY DEFAULT'] + + +def test_set_replica_identity_empty_schema(): + schema = _FakeSchema("ms", [], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result == {"tables_analyzed": 0, "tables_modified": 0, "ddl": []} From 26bda4c7f6efc78b503dfa0efa821a06c23c8809 Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Mon, 22 Jun 2026 13:52:33 -0500 Subject: [PATCH 2/2] fix(deploy): address MilagrosMarin review on #1466 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace `assert` statements with explicit `if … raise DataJointError(…)` in the dispatch ladder. Project convention is to avoid assertions for runtime guarantees because Python's -O strips them; deploy-hook environments may run under -O and would otherwise see AttributeError on None instead of the intended DataJointError. The unactivated-schema case (database is None) is the most concerning since it would have emitted a malformed ALTER TABLE. - Add four unit tests covering previously-untested branches: - `test_set_replica_identity_table_instance_target` — Table-instance dispatch (deploy.py: isinstance(target, Table) branch). - `test_set_replica_identity_table_class_target` — Table-class dispatch (deploy.py: issubclass(target, Table) branch). - `test_set_replica_identity_case_insensitive_mode` — mode="FULL" accepted, matching adapter's case-tolerant handling. - `test_set_replica_identity_unactivated_schema_raises` — Schema with database=None raises rather than emitting malformed DDL. - Accept uppercase mode strings (`mode.lower()` normalization) for parity with the PG adapter's case-tolerant DDL emission. - Tighten type hints: `target: TargetType` (Union of Schema/Table-class/ Table-instance via TYPE_CHECKING forward references), `mode: Literal[ "default", "full"]`. Runtime validation unchanged. - Docstring additions: - Cost section now flags the brief AccessExclusiveLock the ALTER takes. - New "Partial-failure semantics" paragraph explaining that exceptions on table N of M leave first N-1 tables modified but propagate without returning the partial summary; idempotency makes re-running safe. - Style: split the adjacent-string concatenation on the "PostgreSQL-only" error into a single literal. All 20 unit tests pass (was 16 + 4 new). --- src/datajoint/deploy.py | 56 +++++++++++++++++++++++++--------- tests/unit/test_deploy.py | 63 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 14 deletions(-) diff --git a/src/datajoint/deploy.py b/src/datajoint/deploy.py index 14fd2992c..73c4ab30b 100644 --- a/src/datajoint/deploy.py +++ b/src/datajoint/deploy.py @@ -20,12 +20,22 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any, Literal, Union from .errors import DataJointError +if TYPE_CHECKING: + from .schemas import _Schema + from .table import Table + + TargetType = Union["_Schema", type["Table"], "Table"] -def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) -> dict: + +def set_replica_identity( + target: "TargetType", + mode: Literal["default", "full"] = "full", + dry_run: bool = True, +) -> dict: """ Apply ``ALTER TABLE ... REPLICA IDENTITY `` to a schema or table on PostgreSQL. @@ -52,12 +62,24 @@ def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) Cost ---- - The ALTER itself is metadata-only and instant. The cost is in WAL volume - after the change: UPDATE/DELETE on tables with FULL log the entire old row, - which can be sizable on tables with TOASTed bytea columns. For DataJoint's - typical insert-append workload, this cost is negligible. The notable - scenario is bulk ``delete()`` on tables with ```` columns — a - transient WAL burst proportional to the deleted-row payload size. + The ALTER itself is metadata-only and instant, but requires a brief + ``AccessExclusiveLock`` on each table — it will block behind in-flight + writes/reads on a busy table. Run during a quiet window on actively- + ingested tables. + + The ongoing cost is in WAL volume after the change: UPDATE/DELETE on + tables with FULL log the entire old row, which can be sizable on tables + with TOASTed bytea columns. For DataJoint's typical insert-append + workload, this cost is negligible. The notable scenario is bulk + ``delete()`` on tables with ```` columns — a transient WAL burst + proportional to the deleted-row payload size. + + Partial-failure semantics + ------------------------- + If ``connection.query(ddl)`` raises on table N of M, the first N-1 + tables are already modified at the storage layer but the exception + propagates without returning the partial summary. The operation is + idempotent, so re-running brings the remaining tables into compliance. Compliance considerations ------------------------- @@ -112,27 +134,33 @@ def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) Databricks: `Lakehouse Sync `_. """ - if mode not in ("default", "full"): + mode_normalized = mode.lower() if isinstance(mode, str) else mode + if mode_normalized not in ("default", "full"): raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}") + mode = mode_normalized # type: ignore[assignment] from .schemas import _Schema from .table import Table if isinstance(target, _Schema): connection = target.connection - assert connection is not None, "Schema has no active connection" + if connection is None: + raise DataJointError("Schema has no active connection.") adapter = connection.adapter - assert target.database is not None, "Schema is not activated" + if target.database is None: + raise DataJointError("Schema is not activated. Call schema.activate(...) before set_replica_identity().") tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()] elif isinstance(target, type) and issubclass(target, Table): instance = target() connection = instance.connection - assert connection is not None, "Table has no active connection" + if connection is None: + raise DataJointError(f"Table {target.__name__} has no active connection.") adapter = connection.adapter tables = [instance.full_table_name] elif isinstance(target, Table): connection = target.connection - assert connection is not None, "Table has no active connection" + if connection is None: + raise DataJointError(f"Table {type(target).__name__} has no active connection.") adapter = connection.adapter tables = [target.full_table_name] else: @@ -140,7 +168,7 @@ def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) if not hasattr(adapter, "replica_identity_ddl"): raise DataJointError( - f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter " "does not support REPLICA IDENTITY." + f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter does not support REPLICA IDENTITY." ) result: dict[str, Any] = { diff --git a/tests/unit/test_deploy.py b/tests/unit/test_deploy.py index 18c54519d..3cca663a7 100644 --- a/tests/unit/test_deploy.py +++ b/tests/unit/test_deploy.py @@ -107,3 +107,66 @@ def test_set_replica_identity_empty_schema(): schema = _FakeSchema("ms", [], _FakeAdapter()) result = set_replica_identity(schema, mode="full", dry_run=False) assert result == {"tables_analyzed": 0, "tables_modified": 0, "ddl": []} + + +class _FakeTable(dj.Table): + """Table stub bypassing schema-decoration wiring.""" + + # Suppress dj.Table's class-construction checks + table_name = "fake_table" + + def __init__(self, full_table_name: str, adapter: object) -> None: + # Skip dj.Table.__init__ — fabricate the minimal attributes. + self._full_table_name = full_table_name + self._connection = _FakeConnection(adapter) + + @property + def full_table_name(self) -> str: + return self._full_table_name + + @property + def connection(self): + return self._connection + + +def test_set_replica_identity_table_instance_target(): + """Table instance dispatch (deploy.py: isinstance(target, Table) branch).""" + table = _FakeTable('"ms"."the_table"', _FakeAdapter()) + result = set_replica_identity(table, mode="full", dry_run=False) + assert result == { + "tables_analyzed": 1, + "tables_modified": 1, + "ddl": ['ALTER TABLE "ms"."the_table" REPLICA IDENTITY FULL'], + } + assert table.connection.queries == result["ddl"] + + +def test_set_replica_identity_table_class_target(monkeypatch): + """Table-class dispatch (deploy.py: issubclass(target, Table) branch).""" + # Build a class that instantiates a _FakeTable when called like target() + fake_adapter = _FakeAdapter() + + class _TableClass(dj.Table): + def __new__(cls): + return _FakeTable('"ms"."class_table"', fake_adapter) + + # `isinstance(_TableClass, type) and issubclass(_TableClass, dj.Table)` is True. + result = set_replica_identity(_TableClass, mode="full", dry_run=True) + assert result["tables_analyzed"] == 1 + assert result["tables_modified"] == 0 # dry_run + assert result["ddl"] == ['ALTER TABLE "ms"."class_table" REPLICA IDENTITY FULL'] + + +def test_set_replica_identity_case_insensitive_mode(): + """`mode='FULL'` (uppercase) should be accepted, matching adapter case-handling.""" + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + result = set_replica_identity(schema, mode="FULL", dry_run=True) + assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL'] + + +def test_set_replica_identity_unactivated_schema_raises(): + """Schema with database=None (never activated) must raise, not produce malformed DDL.""" + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + schema.database = None + with pytest.raises(DataJointError, match="Schema is not activated"): + set_replica_identity(schema, mode="full", dry_run=True)