Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/datajoint/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,29 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
raise NotImplementedError
...

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""
Generate query to find schemas that the given schemas reference via FK.

Used to discover unloaded schemas that the loaded ones depend on
(the upstream / ancestor direction). Symmetric to
:meth:`find_downstream_schemas_sql`.

Parameters
----------
schemas_list : str
Comma-separated, quoted schema names for an IN clause.

Returns
-------
str
SQL query returning rows with a single column ``schema_name``
containing distinct schema names that are referenced by the
given schemas.
"""
raise NotImplementedError
...

@abstractmethod
def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""
Expand Down
10 changes: 10 additions & 0 deletions src/datajoint/adapters/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,16 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
f"AND table_schema NOT IN ({schemas_list})"
)

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""Find schemas that the given schemas reference via FK."""
return (
f"SELECT DISTINCT referenced_table_schema as schema_name "
f"FROM information_schema.key_column_usage "
f"WHERE table_schema IN ({schemas_list}) "
f"AND referenced_table_schema IS NOT NULL "
f"AND referenced_table_schema NOT IN ({schemas_list})"
)

def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""Query to get FK constraint details from information_schema."""
return (
Expand Down
14 changes: 14 additions & 0 deletions src/datajoint/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,20 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
f"AND ns1.nspname NOT IN ({schemas_list})"
)

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""Find schemas that the given schemas reference via FK."""
return (
f"SELECT DISTINCT ns2.nspname as schema_name "
f"FROM pg_constraint c "
f"JOIN pg_class cl1 ON c.conrelid = cl1.oid "
f"JOIN pg_namespace ns1 ON cl1.relnamespace = ns1.oid "
f"JOIN pg_class cl2 ON c.confrelid = cl2.oid "
f"JOIN pg_namespace ns2 ON cl2.relnamespace = ns2.oid "
f"WHERE c.contype = 'f' "
f"AND ns1.nspname IN ({schemas_list}) "
f"AND ns2.nspname NOT IN ({schemas_list})"
)

def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""
Query to get FK constraint details from information_schema.
Expand Down
29 changes: 29 additions & 0 deletions src/datajoint/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,35 @@ def load_all_downstream(self) -> None:

self.load(force=True, schema_names=known_schemas)

def load_all_upstream(self) -> None:
"""
Load dependencies including all upstream schemas referenced via FK chains.

Iteratively discovers schemas that the currently loaded schemas
reference, expanding the dependency graph until no new schemas
are found. This ensures that upstream restriction propagation
(``Diagram.trace()``) reaches all ancestor tables, including
those in schemas the user has not explicitly activated.

Called automatically by ``Diagram.trace()``. Symmetric to
:meth:`load_all_downstream`.
"""
adapter = self._conn.adapter
known_schemas = set(self._conn.schemas)
if not known_schemas:
self.load()
return

while True:
schemas_list = ", ".join(adapter.quote_string(s) for s in known_schemas)
result = self._conn.query(adapter.find_upstream_schemas_sql(schemas_list))
new_schemas = {row[0] for row in result} - known_schemas
if not new_schemas:
break
known_schemas |= new_schemas

self.load(force=True, schema_names=known_schemas)

def topo_sort(self) -> list[str]:
"""
Return table names in topological order.
Expand Down
Loading
Loading