-
Notifications
You must be signed in to change notification settings - Fork 509
Load table credentials from credentials endpoint #3500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| cast, | ||
| ) | ||
| from urllib.parse import quote, unquote | ||
|
|
||
|
|
@@ -233,6 +234,7 @@ class ScanPlanningMode(Enum): | |
|
|
||
|
|
||
| ACCESS_DELEGATION_DEFAULT = "vended-credentials" | ||
| ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation" | ||
| AUTHORIZATION_HEADER = "Authorization" | ||
| BEARER_PREFIX = "Bearer" | ||
| CATALOG_SCOPE = "catalog" | ||
|
|
@@ -486,6 +488,40 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l | |
|
|
||
| return best_match.config if best_match else {} | ||
|
|
||
| def _should_load_credentials_from_endpoint(self) -> bool: | ||
| if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints: | ||
| return False | ||
|
|
||
| access_delegation = cast(str, self._session.headers.get(ACCESS_DELEGATION_HEADER, "")) | ||
| # The spec encodes access delegation as a comma-separated list of mechanisms. | ||
| # Load credentials only when vended-credentials is requested. | ||
| return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(",")) | ||
|
|
||
| def _resolve_table_credentials_from_response_or_endpoint( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Compared this to Java We don't have that layer, today so this eager fallback makes sense to me. And just putting this here so it's clear. |
||
| self, identifier_tuple: tuple[str, ...], table_response: TableResponse | ||
| ) -> Properties: | ||
| """Resolve credentials from the table response or load them from the credentials endpoint. | ||
|
|
||
| Inline storage-credentials from the table response take precedence. When the response | ||
| does not include credentials, the catalog advertises loadCredentials, and vended | ||
| credentials are requested, load credentials from the table's /credentials endpoint. | ||
| """ | ||
| location = table_response.metadata_location or table_response.metadata.location | ||
| # Java keeps storage credentials with FileIO for path-level selection. PyIceberg resolves | ||
| # them to FileIO properties here, so it needs a location to choose a matching prefix. | ||
| if not location: | ||
| return {} | ||
|
|
||
| credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location) | ||
| if table_response.storage_credentials: | ||
| return credential_config | ||
|
|
||
| if not self._should_load_credentials_from_endpoint(): | ||
| return {} | ||
|
|
||
| credentials_response = self._load_credentials(identifier_tuple) | ||
| return self._resolve_storage_credentials(credentials_response.storage_credentials, location) | ||
|
|
||
| def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: | ||
| merged_properties = {**self.properties, **properties} | ||
| if self._auth_manager: | ||
|
|
@@ -826,10 +862,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin | |
| session.mount(self.uri, SigV4Adapter(**self.properties)) | ||
|
|
||
| def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table: | ||
| # Per Iceberg spec: storage-credentials take precedence over config | ||
| credential_config = self._resolve_storage_credentials( | ||
| table_response.storage_credentials, table_response.metadata_location | ||
| ) | ||
| credential_config = self._resolve_table_credentials_from_response_or_endpoint(identifier_tuple, table_response) | ||
| return Table( | ||
| identifier=identifier_tuple, | ||
| metadata_location=table_response.metadata_location, # type: ignore | ||
|
|
@@ -878,7 +911,7 @@ def _config_headers(self, session: Session) -> None: | |
| session.headers["Content-type"] = "application/json" | ||
| session.headers["User-Agent"] = f"PyIceberg/{__version__}" | ||
| session.headers["X-Client-Version"] = f"PyIceberg {__version__}" | ||
| session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT) | ||
| session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT) | ||
|
|
||
| def _create_table( | ||
| self, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need to cast or does it break linter? locally it shows
unnecessary cast; type is already 'str'