Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,31 @@ catalog:
| snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. |
| `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms. When using `vended-credentials`, the server provides temporary credentials to the client. When using `remote-signing`, the server signs requests on behalf of the client. (default: `vended-credentials`) |

#### Retry and timeout

The REST Catalog uses `requests` with no retries and no timeout by default, so transient
5xx/network failures bubble up immediately and slow servers can hang the client indefinitely.
Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy.
Both keys are optional; when neither is set, the default `requests` behavior is preserved.

```yaml
catalog:
default:
uri: http://rest-catalog/ws/
connection:
timeout: 60 # seconds, applied to every HTTP call
retry:
total: 5
backoff_factor: 1.0
status_forcelist: [429, 500, 502, 503, 504]
allowed_methods: [GET, HEAD, OPTIONS]
```

| Key | Example | Description |
| ---------------------------- | ------------------------------------ | ------------------------------------------------------------------------------------------------------ |
| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. |
| connection.retry | `{total: 5, backoff_factor: 1.0}` | Mapping passed verbatim as kwargs to [`urllib3.util.retry.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry). |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like having a dictionary here is a bit tricky; this couples the configuration tightly with urllib3. When urllib changes something, or we want to use another library, then we have to map this. How about adding the options explicitly.


#### Headers in REST Catalog

To configure custom headers in REST Catalog, include them in the catalog properties with `header.<Header-Name>`. This
Expand Down
72 changes: 69 additions & 3 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
from urllib.parse import quote, unquote

from pydantic import ConfigDict, Field, TypeAdapter, field_validator
from requests import HTTPError, Session
from requests import HTTPError, PreparedRequest, Response, Session
from requests.adapters import HTTPAdapter
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
from typing_extensions import override
from urllib3.util.retry import Retry

from pyiceberg import __version__
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
Expand Down Expand Up @@ -255,6 +257,9 @@ class ScanPlanningMode(Enum):
SIGV4_SERVICE = "rest.signing-name"
SIGV4_MAX_RETRIES = "rest.sigv4.max-retries"
SIGV4_MAX_RETRIES_DEFAULT = 10
CONNECTION = "connection"
CONNECTION_TIMEOUT = "timeout"
CONNECTION_RETRY = "retry"
EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
OAUTH2_SERVER_URI = "oauth2-server-uri"
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
Expand Down Expand Up @@ -392,6 +397,63 @@ class ListViewsResponse(IcebergBaseModel):
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)


class _RetryTimeoutHTTPAdapter(HTTPAdapter):
"""HTTPAdapter that applies a default per-request timeout.

requests does not provide a way to set a default timeout on a Session;
without this adapter, every call would have to thread `timeout=` through.
The adapter applies `self._timeout` whenever a per-call timeout is not set.
"""

def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None:
self._timeout = timeout
if max_retries is not None:
super().__init__(max_retries=max_retries)
else:
super().__init__()

def send(self, request: PreparedRequest, **kwargs: Any) -> Response:
if kwargs.get("timeout") is None and self._timeout is not None:
kwargs["timeout"] = self._timeout
return super().send(request, **kwargs)


def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None:
"""Build a connection adapter from the optional `connection.*` properties.

Returns None when no `connection` block is supplied, leaving the default
Session behavior unchanged. Raises ValueError on invalid input.
"""
connection_config = properties.get(CONNECTION)
if not connection_config:
return None
if not isinstance(connection_config, dict):
raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}")

timeout: float | None = None
if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None:
try:
timeout = float(raw_timeout)
except (TypeError, ValueError) as e:
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e
if timeout <= 0:
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}")

retry: Retry | None = None
if (retry_config := connection_config.get(CONNECTION_RETRY)) is not None:
if not isinstance(retry_config, dict):
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRY}` must be a mapping, got: {type(retry_config).__name__}")
try:
retry = Retry(**retry_config)
except TypeError as e:
raise ValueError(f"Invalid `{CONNECTION}.{CONNECTION_RETRY}` configuration: {e}") from e

if timeout is None and retry is None:
return None

return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=retry)


class RestCatalog(Catalog):
uri: str
_session: Session
Expand All @@ -418,6 +480,12 @@ def _create_session(self) -> Session:
"""Create a request session with provided catalog configuration."""
session = Session()

# Mount the retry/timeout adapter when `connection.*` properties are set.
# SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host.
if (connection_adapter := _create_connection_adapter(self.properties)) is not None:
session.mount("http://", connection_adapter)
session.mount("https://", connection_adapter)

# Set HTTP headers
self._config_headers(session)

Expand Down Expand Up @@ -763,8 +831,6 @@ def _init_sigv4(self, session: Session) -> None:
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from requests import PreparedRequest
from requests.adapters import HTTPAdapter

class SigV4Adapter(HTTPAdapter):
def __init__(self, **properties: str):
Expand Down
69 changes: 69 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import pyiceberg
from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
from pyiceberg.catalog.rest import (
CONNECTION,
CONNECTION_RETRY,
CONNECTION_TIMEOUT,
DEFAULT_ENDPOINTS,
EMPTY_BODY_SHA256,
OAUTH2_SERVER_URI,
Expand All @@ -43,6 +46,7 @@
HttpMethod,
RestCatalog,
ScanPlanningMode,
_RetryTimeoutHTTPAdapter,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
Expand Down Expand Up @@ -2019,6 +2023,71 @@ def test_request_session_with_ssl_client_cert() -> None:
assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value)


def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None:
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
for adapter in catalog._session.adapters.values():
assert not isinstance(adapter, _RetryTimeoutHTTPAdapter)


def test_session_with_connection_timeout_and_retry(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {
CONNECTION_TIMEOUT: 60,
CONNECTION_RETRY: {
"total": 5,
"backoff_factor": 1.0,
"status_forcelist": [429, 500, 502, 503, 504],
"allowed_methods": ["GET", "HEAD", "OPTIONS"],
},
},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore

https_adapter = catalog._session.adapters["https://"]
http_adapter = catalog._session.adapters["http://"]
assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter)
assert https_adapter is http_adapter
assert https_adapter._timeout == 60.0
assert https_adapter.max_retries.total == 5
assert https_adapter.max_retries.backoff_factor == 1.0
assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504]
assert set(https_adapter.max_retries.allowed_methods) == {"GET", "HEAD", "OPTIONS"}


def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_TIMEOUT: "30"},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
adapter = catalog._session.adapters["https://"]
assert isinstance(adapter, _RetryTimeoutHTTPAdapter)
assert adapter._timeout == 30.0


def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_TIMEOUT: -1},
}
with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"):
RestCatalog("rest", **catalog_properties) # type: ignore


def test_session_with_invalid_connection_retry_kwarg_raises(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_RETRY: {"bogus_kwarg": 1}},
}
with pytest.raises(ValueError, match="Invalid `connection.retry` configuration"):
RestCatalog("rest", **catalog_properties) # type: ignore


def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None:
# Given
rest_mock.get(
Expand Down
Loading