Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,32 @@ def create_view(
ViewAlreadyExistsError: If a view with the name already exists.
"""

@abstractmethod
def replace_view(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
"""Replace a view.

Args:
identifier (str | Identifier): View identifier.
schema (Schema): View's schema.
view_version (ViewVersion): The format version for the view.
location (str | None): Location for the view. Optional Argument.
properties (Properties): View properties that can be a string based dictionary.

Returns:
View: the created view instance.

Raises:
TableAlreadyExistsError: If a table with the same name already exists.
NoSuchViewError: If a view with the name does not exist.
"""

@staticmethod
def identifier_to_tuple(identifier: str | Identifier) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down Expand Up @@ -980,6 +1006,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def _create_staged_table(
self,
identifier: str | Identifier,
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

@override
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

@override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
86 changes: 86 additions & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,16 @@
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.table.update import (
AddSchemaUpdate,
AddViewVersionUpdate,
AssertViewUUID,
SetCurrentViewVersionUpdate,
SetLocationUpdate,
SetPropertiesUpdate,
TableRequirement,
TableUpdate,
ViewRequirement,
ViewUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.types import transform_dict_value_to_str
Expand Down Expand Up @@ -155,6 +163,7 @@ class Endpoints:
list_views: str = "namespaces/{namespace}/views"
load_view: str = "namespaces/{namespace}/views/{view}"
create_view: str = "namespaces/{namespace}/views"
update_view: str = "namespaces/{namespace}/views/{view}"
register_view: str = "namespaces/{namespace}/register-view"
drop_view: str = "namespaces/{namespace}/views/{view}"
view_exists: str = "namespaces/{namespace}/views/{view}"
Expand Down Expand Up @@ -185,6 +194,7 @@ class Capability:
V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}")
V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}")
V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.view_exists}")
V1_UPDATE_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.update_view}")
V1_REGISTER_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_view}")
V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_view}")
V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.plan_table_scan}")
Expand Down Expand Up @@ -215,6 +225,7 @@ class Capability:
(
Capability.V1_LIST_VIEWS,
Capability.V1_LOAD_VIEW,
Capability.V1_UPDATE_VIEW,
Capability.V1_DELETE_VIEW,
)
)
Expand Down Expand Up @@ -337,6 +348,12 @@ class RegisterViewRequest(IcebergBaseModel):
metadata_location: str = Field(..., alias="metadata-location")


class CommitViewRequest(IcebergBaseModel):
identifier: TableIdentifier = Field()
requirements: tuple[ViewRequirement, ...] = Field(default_factory=tuple)
updates: tuple[ViewUpdate, ...] = Field(default_factory=tuple)


class ConfigResponse(IcebergBaseModel):
defaults: Properties | None = Field(default_factory=dict)
overrides: Properties | None = Field(default_factory=dict)
Expand Down Expand Up @@ -1000,6 +1017,75 @@ def create_view(
view_response = ViewResponse.model_validate_json(response.text)
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)

@override
@retry(**_RETRY_ARGS)
def replace_view(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
self._check_endpoint(Capability.V1_UPDATE_VIEW)
iceberg_schema = self._convert_schema_if_needed(schema)

namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW)
if self.table_exists(identifier):
raise TableAlreadyExistsError(f"Table with same name already exists: {identifier}")
if not self.view_exists(identifier):
raise NoSuchViewError(f"View does not exist: {identifier}")

current_view = self.load_view(identifier)

if location:
location = location.rstrip("/")

# Check if schema already exists in view metadata by comparing structure
schema_id = None
for existing_schema in current_view.metadata.schemas:
if existing_schema.as_struct() == iceberg_schema.as_struct():
schema_id = existing_schema.schema_id
break

updates: list[ViewUpdate] = []
if schema_id is None:
# Schema not found, add new schema with next schema_id
next_schema_id = max((s.schema_id for s in current_view.metadata.schemas), default=0) + 1
schema_to_add = iceberg_schema.model_copy(update={"schema_id": next_schema_id})
updates.append(AddSchemaUpdate(schema_=schema_to_add))
schema_id = next_schema_id

fresh_view_version = view_version.model_copy(update={"schema_id": schema_id})
updates.append(AddViewVersionUpdate(view_version=fresh_view_version))
updates.append(SetCurrentViewVersionUpdate(view_version_id=fresh_view_version.version_id))

updates_tuple: tuple[ViewUpdate, ...] = tuple(updates)
if location:
updates_tuple = updates_tuple + (SetLocationUpdate(location=location),)
if properties:
updates_tuple = updates_tuple + (SetPropertiesUpdate(updates=properties),)

requirements: tuple[ViewRequirement, ...] = (AssertViewUUID(uuid=current_view.metadata.view_uuid),)

identifier = current_view.name()
view_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
request = CommitViewRequest(identifier=view_identifier, requirements=requirements, updates=updates_tuple)

serialized_json = request.model_dump_json().encode(UTF8)
response = self._session.post(
self.url(Endpoints.update_view, **namespace_and_view),
data=serialized_json,
)

try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {409: CommitFailedException, 404: NoSuchViewError})

view_response = ViewResponse.model_validate_json(response.text)
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)

@retry(**_RETRY_ARGS)
@override
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,17 @@ def create_view(
) -> View:
raise NotImplementedError

@override
def replace_view(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
view_version: ViewVersion,
location: str | None = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
Expand Down
40 changes: 40 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ class RemovePartitionStatisticsUpdate(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")


class AddViewVersionUpdate(IcebergBaseModel):
action: Literal["add-view-version"] = Field(default="add-view-version")
view_version: Any = Field(alias="view-version")


class SetCurrentViewVersionUpdate(IcebergBaseModel):
action: Literal["set-current-view-version"] = Field(default="set-current-view-version")
view_version_id: int = Field(alias="view-version-id")


TableUpdate = Annotated[
AssignUUIDUpdate
| UpgradeFormatVersionUpdate
Expand Down Expand Up @@ -791,6 +801,19 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}")


class AssertViewUUID(ValidatableTableRequirement):
"""The view UUID must match the requirement's `uuid`."""

type: Literal["assert-view-uuid"] = Field(default="assert-view-uuid")
uuid: uuid.UUID

def validate(self, base_metadata: TableMetadata | None) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current view metadata is missing")
elif self.uuid != base_metadata.table_uuid:
raise CommitFailedException(f"View UUID does not match: {self.uuid} != {base_metadata.table_uuid}")


class AssertRefSnapshotId(ValidatableTableRequirement):
"""The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`.

Expand Down Expand Up @@ -919,4 +942,21 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
Field(discriminator="type"),
]

ViewUpdate = Annotated[
AssignUUIDUpdate
| UpgradeFormatVersionUpdate
| AddSchemaUpdate
| SetLocationUpdate
| SetPropertiesUpdate
| RemovePropertiesUpdate
| AddViewVersionUpdate
| SetCurrentViewVersionUpdate,
Field(discriminator="action"),
]

ViewRequirement = Annotated[
AssertViewUUID,
Field(discriminator="type"),
]

UpdatesAndRequirements = tuple[tuple[TableUpdate, ...], tuple[TableRequirement, ...]]
8 changes: 8 additions & 0 deletions pyiceberg/view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
from pyiceberg.typedef import Identifier
from pyiceberg.view.metadata import SQLViewRepresentation, ViewHistoryEntry, ViewMetadata, ViewVersion

__all__ = [
"View",
"ViewMetadata",
"ViewVersion",
"ViewHistoryEntry",
"SQLViewRepresentation",
]


class View:
"""An Iceberg view."""
Expand Down
1 change: 1 addition & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
Capability.V1_LIST_VIEWS,
Capability.V1_LOAD_VIEW,
Capability.V1_VIEW_EXISTS,
Capability.V1_UPDATE_VIEW,
Capability.V1_REGISTER_VIEW,
Capability.V1_DELETE_VIEW,
Capability.V1_SUBMIT_TABLE_SCAN_PLAN,
Expand Down
Loading