diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 95ceaa539f..463d4091ab 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -744,6 +744,32 @@ def create_view( ViewAlreadyExistsError: If a view with the name already exists. """ + @abstractmethod + def replace_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + """Replace a view. + + Args: + identifier (str | Identifier): View identifier. + schema (Schema): View's schema. + view_version (ViewVersion): The format version for the view. + location (str | None): Location for the view. Optional Argument. + properties (Properties): View properties that can be a string based dictionary. + + Returns: + View: the created view instance. + + Raises: + TableAlreadyExistsError: If a table with the same name already exists. + NoSuchViewError: If a view with the name does not exist. + """ + @staticmethod def identifier_to_tuple(identifier: str | Identifier) -> Identifier: """Parse an identifier to a tuple. @@ -980,6 +1006,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def _create_staged_table( self, identifier: str | Identifier, diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 74c0be6c9a..1b1ad274fb 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -564,6 +564,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + @override def list_views(self, namespace: str | Identifier) -> list[Identifier]: raise NotImplementedError diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 12b36efc5c..dd677db8d0 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -981,6 +981,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + @override def list_views(self, namespace: str | Identifier) -> list[Identifier]: raise NotImplementedError diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 181f9d4661..0d7d941094 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -443,6 +443,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + @override def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table: """Register a new table using existing metadata. diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index aeb3c72843..d719c6c0e8 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -172,6 +172,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + @override def load_view(self, identifier: str | Identifier) -> View: raise NotImplementedError diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d085c6fd87..fc03c548b7 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -83,8 +83,16 @@ from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids from pyiceberg.table.update import ( + AddSchemaUpdate, + AddViewVersionUpdate, + AssertViewUUID, + SetCurrentViewVersionUpdate, + SetLocationUpdate, + SetPropertiesUpdate, TableRequirement, TableUpdate, + ViewRequirement, + ViewUpdate, ) from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties from pyiceberg.types import transform_dict_value_to_str @@ -155,6 +163,7 @@ class Endpoints: list_views: str = "namespaces/{namespace}/views" load_view: str = "namespaces/{namespace}/views/{view}" create_view: str = "namespaces/{namespace}/views" + update_view: str = "namespaces/{namespace}/views/{view}" register_view: str = "namespaces/{namespace}/register-view" drop_view: str = "namespaces/{namespace}/views/{view}" view_exists: str = "namespaces/{namespace}/views/{view}" @@ -185,6 +194,7 @@ class Capability: V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}") V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}") V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.view_exists}") + V1_UPDATE_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.update_view}") V1_REGISTER_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_view}") V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_view}") V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.plan_table_scan}") @@ -215,6 +225,7 @@ class Capability: ( Capability.V1_LIST_VIEWS, Capability.V1_LOAD_VIEW, + Capability.V1_UPDATE_VIEW, Capability.V1_DELETE_VIEW, ) ) @@ -337,6 +348,12 @@ class RegisterViewRequest(IcebergBaseModel): metadata_location: str = Field(..., alias="metadata-location") +class CommitViewRequest(IcebergBaseModel): + identifier: TableIdentifier = Field() + requirements: tuple[ViewRequirement, ...] = Field(default_factory=tuple) + updates: tuple[ViewUpdate, ...] = Field(default_factory=tuple) + + class ConfigResponse(IcebergBaseModel): defaults: Properties | None = Field(default_factory=dict) overrides: Properties | None = Field(default_factory=dict) @@ -1000,6 +1017,75 @@ def create_view( view_response = ViewResponse.model_validate_json(response.text) return self._response_to_view(self.identifier_to_tuple(identifier), view_response) + @override + @retry(**_RETRY_ARGS) + def replace_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + self._check_endpoint(Capability.V1_UPDATE_VIEW) + iceberg_schema = self._convert_schema_if_needed(schema) + + namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW) + if self.table_exists(identifier): + raise TableAlreadyExistsError(f"Table with same name already exists: {identifier}") + if not self.view_exists(identifier): + raise NoSuchViewError(f"View does not exist: {identifier}") + + current_view = self.load_view(identifier) + + if location: + location = location.rstrip("/") + + # Check if schema already exists in view metadata by comparing structure + schema_id = None + for existing_schema in current_view.metadata.schemas: + if existing_schema.as_struct() == iceberg_schema.as_struct(): + schema_id = existing_schema.schema_id + break + + updates: list[ViewUpdate] = [] + if schema_id is None: + # Schema not found, add new schema with next schema_id + next_schema_id = max((s.schema_id for s in current_view.metadata.schemas), default=0) + 1 + schema_to_add = iceberg_schema.model_copy(update={"schema_id": next_schema_id}) + updates.append(AddSchemaUpdate(schema_=schema_to_add)) + schema_id = next_schema_id + + fresh_view_version = view_version.model_copy(update={"schema_id": schema_id}) + updates.append(AddViewVersionUpdate(view_version=fresh_view_version)) + updates.append(SetCurrentViewVersionUpdate(view_version_id=fresh_view_version.version_id)) + + updates_tuple: tuple[ViewUpdate, ...] = tuple(updates) + if location: + updates_tuple = updates_tuple + (SetLocationUpdate(location=location),) + if properties: + updates_tuple = updates_tuple + (SetPropertiesUpdate(updates=properties),) + + requirements: tuple[ViewRequirement, ...] = (AssertViewUUID(uuid=current_view.metadata.view_uuid),) + + identifier = current_view.name() + view_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) + request = CommitViewRequest(identifier=view_identifier, requirements=requirements, updates=updates_tuple) + + serialized_json = request.model_dump_json().encode(UTF8) + response = self._session.post( + self.url(Endpoints.update_view, **namespace_and_view), + data=serialized_json, + ) + + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {409: CommitFailedException, 404: NoSuchViewError}) + + view_response = ViewResponse.model_validate_json(response.text) + return self._response_to_view(self.identifier_to_tuple(identifier), view_response) + @retry(**_RETRY_ARGS) @override def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table: diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 87446bd58b..ba68dacde0 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -754,6 +754,17 @@ def create_view( ) -> View: raise NotImplementedError + @override + def replace_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + view_version: ViewVersion, + location: str | None = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + @override def list_views(self, namespace: str | Identifier) -> list[Identifier]: raise NotImplementedError diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 64838b0bd6..c851f67320 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -222,6 +222,16 @@ class RemovePartitionStatisticsUpdate(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") +class AddViewVersionUpdate(IcebergBaseModel): + action: Literal["add-view-version"] = Field(default="add-view-version") + view_version: Any = Field(alias="view-version") + + +class SetCurrentViewVersionUpdate(IcebergBaseModel): + action: Literal["set-current-view-version"] = Field(default="set-current-view-version") + view_version_id: int = Field(alias="view-version-id") + + TableUpdate = Annotated[ AssignUUIDUpdate | UpgradeFormatVersionUpdate @@ -791,6 +801,19 @@ def validate(self, base_metadata: TableMetadata | None) -> None: raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}") +class AssertViewUUID(ValidatableTableRequirement): + """The view UUID must match the requirement's `uuid`.""" + + type: Literal["assert-view-uuid"] = Field(default="assert-view-uuid") + uuid: uuid.UUID + + def validate(self, base_metadata: TableMetadata | None) -> None: + if base_metadata is None: + raise CommitFailedException("Requirement failed: current view metadata is missing") + elif self.uuid != base_metadata.table_uuid: + raise CommitFailedException(f"View UUID does not match: {self.uuid} != {base_metadata.table_uuid}") + + class AssertRefSnapshotId(ValidatableTableRequirement): """The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`. @@ -919,4 +942,21 @@ def validate(self, base_metadata: TableMetadata | None) -> None: Field(discriminator="type"), ] +ViewUpdate = Annotated[ + AssignUUIDUpdate + | UpgradeFormatVersionUpdate + | AddSchemaUpdate + | SetLocationUpdate + | SetPropertiesUpdate + | RemovePropertiesUpdate + | AddViewVersionUpdate + | SetCurrentViewVersionUpdate, + Field(discriminator="action"), +] + +ViewRequirement = Annotated[ + AssertViewUUID, + Field(discriminator="type"), +] + UpdatesAndRequirements = tuple[tuple[TableUpdate, ...], tuple[TableRequirement, ...]] diff --git a/pyiceberg/view/__init__.py b/pyiceberg/view/__init__.py index 7f9343218c..717bf71269 100644 --- a/pyiceberg/view/__init__.py +++ b/pyiceberg/view/__init__.py @@ -23,6 +23,14 @@ from pyiceberg.typedef import Identifier from pyiceberg.view.metadata import SQLViewRepresentation, ViewHistoryEntry, ViewMetadata, ViewVersion +__all__ = [ + "View", + "ViewMetadata", + "ViewVersion", + "ViewHistoryEntry", + "SQLViewRepresentation", +] + class View: """An Iceberg view.""" diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 1eb9f26a56..70e65d6fdc 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -107,6 +107,7 @@ Capability.V1_LIST_VIEWS, Capability.V1_LOAD_VIEW, Capability.V1_VIEW_EXISTS, + Capability.V1_UPDATE_VIEW, Capability.V1_REGISTER_VIEW, Capability.V1_DELETE_VIEW, Capability.V1_SUBMIT_TABLE_SCAN_PLAN, diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 630cae4767..aab5181360 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -35,6 +35,7 @@ NamespaceNotEmptyError, NoSuchNamespaceError, NoSuchTableError, + NoSuchViewError, TableAlreadyExistsError, ValidationError, ) @@ -653,6 +654,61 @@ def test_rest_create_view( assert rest_catalog.load_view(identifier).schema() == view.schema() +@pytest.mark.integration +def test_rest_replace_view( + rest_catalog: RestCatalog, + example_view_metadata_v1: dict[str, Any], + example_view_metadata_v1_multiple_versions: dict[str, Any], + database_name: str, + view_name: str, +) -> None: + identifier = (database_name, view_name) + + rest_catalog.create_namespace_if_not_exists(database_name) + new_view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1)) + + assert not rest_catalog.view_exists(identifier) + + rest_catalog.create_view(identifier, new_view.schema(), new_view.current_version()) + assert rest_catalog.view_exists(identifier) + assert rest_catalog.load_view(identifier).schema() == new_view.schema() + + replaced_view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1_multiple_versions)) + rest_catalog.replace_view(identifier, replaced_view.schema(), replaced_view.current_version()) + assert rest_catalog.view_exists(identifier) + assert rest_catalog.load_view(identifier).schema() == replaced_view.schema() + + +@pytest.mark.integration +def test_rest_replace_nonexistent_view( + rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str +) -> None: + identifier = (database_name, view_name) + + rest_catalog.create_namespace_if_not_exists(database_name) + view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1)) + + assert not rest_catalog.view_exists(identifier) + + with pytest.raises(NoSuchViewError): + rest_catalog.replace_view(identifier, view.schema(), view.current_version()) + + +@pytest.mark.integration +def test_rest_replace_view_with_table( + rest_catalog: RestCatalog, test_schema: Schema, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str +) -> None: + identifier = (database_name, view_name) + + rest_catalog.create_namespace_if_not_exists(database_name) + rest_catalog.create_table(identifier, test_schema) + + view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1)) + + with pytest.raises(TableAlreadyExistsError): + rest_catalog.replace_view(identifier, view.schema(), view.current_version()) + + @pytest.mark.integration def test_rest_drop_view( rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str