Skip to content

Commit ef4e7b5

Browse files
committed
Deprecate Asset.extra
1 parent 95d1a38 commit ef4e7b5

6 files changed

Lines changed: 36 additions & 63 deletions

File tree

airflow-core/docs/authoring-and-scheduling/assets.rst

Lines changed: 6 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -90,59 +90,6 @@ The identifier does not have to be absolute; it can be a scheme-less, relative U
9090
9191
Non-absolute identifiers are considered plain strings that do not carry any semantic meanings to Airflow.
9292

93-
Extra information on assets
94-
----------------------------
95-
96-
If needed, you can include an additional dictionary in an asset using the ``extra`` parameter:
97-
98-
.. code-block:: python
99-
100-
example_asset = Asset(
101-
"s3://asset/example.csv",
102-
extra={"team": "trainees"},
103-
)
104-
105-
This allows you to provide custom metadata about the asset, such as ownership information or the purpose of the file. The ``extra`` field does **NOT** affect the identity of an asset.
106-
Thus, maintaining the uniqueness of the ``extra`` value is the user responsibility. It suggested to have only one single set of ``extra`` value per asset.
107-
108-
For example, in the following snippet, only one of the ``extra`` dictionaries will ultimately be stored, but it does guaranteed which one will be stored.
109-
110-
.. code-block:: python
111-
112-
Asset("s3://asset/example.csv", extra={"d": "e"})
113-
Asset("s3://asset/example.csv", extra={"f": "g"})
114-
115-
This behavior also applies to dynamically generated assets created through ``AssetAlias``.
116-
In the example below, the final stored ``extra`` value is not guaranteed and it might vary based on Dag processor settings.
117-
118-
.. code-block:: python
119-
120-
from airflow.sdk import AssetAlias
121-
122-
123-
@dag(schedule=None)
124-
def my_dag_1():
125-
126-
@task(outlets=[AssetAlias("my-task-outputs")])
127-
def my_task_with_outlet_events(*, outlet_events):
128-
outlet_events[AssetAlias("my-task-outputs")].add(
129-
# Asset extra set as {"from": "asset alias"}
130-
Asset("s3://bucket/my-task", extra={"from": "asset alias"})
131-
)
132-
133-
my_task_with_outlet_events()
134-
135-
136-
# Asset extra set as {"key": "value"}
137-
@dag(schedule=Asset("s3://bucket/my-task", extra={"key": "value"}))
138-
def my_dag_2(): ...
139-
140-
141-
my_dag_1()
142-
my_dag_2()
143-
144-
# It's not guaranteed which extra will be the one stored
145-
14693
Security Warnings
14794
----------------------------
14895

@@ -193,8 +140,7 @@ Attaching extra information to an emitting asset event
193140

194141
.. versionadded:: 2.10.0
195142

196-
A task with an asset outlet can optionally attach extra information before it emits an asset event. This is different
197-
from `Extra information on assets`_. Extra information on an asset statically describes the entity pointed to by the asset URI; extra information on the *asset event* instead should be used to annotate the triggering data change, such as how many rows in the database are changed by the update, or the date range covered by it.
143+
A task with an asset outlet can optionally attach extra information before it emits an asset event.
198144

199145
The easiest way to attach extra information to the asset event is by ``yield``-ing a ``Metadata`` object from a task:
200146

@@ -225,6 +171,11 @@ There's minimal magic here---Airflow simply writes the yielded values to the exa
225171

226172
.. note:: Asset event extra information can only contain JSON-serializable values (list and dict nesting is possible). This is due to the value being stored in the database.
227173

174+
.. versionchanged:: 3.2.0
175+
176+
Assets may also contain an extra dict, which is static information distinct from event extras.
177+
This was considered confusing, has been deprecated, and will be removed in Airflow 4.
178+
228179
.. _fetching_information_from_previously_emitted_asset_events:
229180

230181
Fetching information from previously emitted asset events

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,6 @@ def register_asset_changes_in_db(
14181418
outlet_events: list[dict[str, Any]],
14191419
session: Session = NEW_SESSION,
14201420
) -> None:
1421-
print(task_outlets, outlet_events)
14221421
from airflow.serialization.definitions.assets import (
14231422
SerializedAsset,
14241423
SerializedAssetNameRef,

airflow-core/src/airflow/serialization/decoders.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ def _decode_asset(var: dict[str, Any]):
9595
name=var["name"],
9696
uri=var["uri"],
9797
group=var["group"],
98-
extra=var["extra"],
9998
watchers=[
10099
SerializedAssetWatcher(
101100
name=watcher["name"],
@@ -106,6 +105,8 @@ def _decode_asset(var: dict[str, Any]):
106105
)
107106
for watcher in watchers
108107
],
108+
# TODO: Deprecated in SDK. Remove in Airflow 4.0.
109+
extra=var.get("extra", {}),
109110
)
110111

111112

airflow-core/src/airflow/serialization/definitions/assets.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
if TYPE_CHECKING:
3030
from collections.abc import Callable, Iterable, Iterator, MutableSequence
3131

32+
from pydantic import JsonValue
3233
from typing_extensions import Self
3334

3435
from airflow.models.asset import AssetModel
@@ -118,9 +119,11 @@ class SerializedAsset(SerializedAssetBase):
118119
name: str
119120
uri: str
120121
group: str
121-
extra: dict[str, Any]
122122
watchers: MutableSequence[SerializedAssetWatcher]
123123

124+
# TODO: Deprecated in SDK. Remove in Airflow 4.0.
125+
extra: dict[str, JsonValue]
126+
124127
def as_expression(self) -> Any:
125128
"""
126129
Serialize the asset into its scheduling expression.

airflow-core/src/airflow/serialization/encoders.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase) -> dict[str, Any]:
179179
d: dict[str, Any]
180180
match a:
181181
case Asset() | SerializedAsset():
182-
d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group": a.group, "extra": a.extra}
182+
d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group": a.group}
183+
if a.extra:
184+
d["extra"] = a.extra
183185
if a.watchers:
184186
d["watchers"] = [{"name": w.name, "trigger": encode_trigger(w.trigger)} for w in a.watchers]
185187
return d

task-sdk/src/airflow/sdk/definitions/asset/__init__.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import attrs
3030

31+
from airflow.sdk.exceptions import RemovedInAirflow4Warning
3132
from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
3233

3334
if TYPE_CHECKING:
@@ -271,7 +272,7 @@ class Asset(os.PathLike, BaseAsset):
271272
default=attrs.Factory(operator.attrgetter("asset_type"), takes_self=True),
272273
validator=[_validate_identifier],
273274
)
274-
extra: dict[str, JsonValue] = attrs.field(
275+
_extra: dict[str, JsonValue] = attrs.field(
275276
factory=dict,
276277
converter=_set_extra_default,
277278
)
@@ -289,7 +290,6 @@ def __init__(
289290
uri: str | ObjectStoragePath,
290291
*,
291292
group: str = ...,
292-
extra: dict[str, JsonValue] | None = None,
293293
watchers: list[AssetWatcher] = ...,
294294
) -> None:
295295
"""Canonical; both name and uri are provided."""
@@ -300,7 +300,6 @@ def __init__(
300300
name: str,
301301
*,
302302
group: str = ...,
303-
extra: dict[str, JsonValue] | None = None,
304303
watchers: list[AssetWatcher] = ...,
305304
) -> None:
306305
"""It's possible to only provide the name, either by keyword or as the only positional argument."""
@@ -311,7 +310,6 @@ def __init__(
311310
*,
312311
uri: str | ObjectStoragePath,
313312
group: str = ...,
314-
extra: dict[str, JsonValue] | None = None,
315313
watchers: list[AssetWatcher] = ...,
316314
) -> None:
317315
"""It's possible to only provide the URI as a keyword argument."""
@@ -342,6 +340,11 @@ def __init__(
342340
if group is not None:
343341
kwargs["group"] = group
344342
if extra is not None:
343+
warnings.warn(
344+
"Asset.extra is deprecated and will be removed in the future.",
345+
RemovedInAirflow4Warning,
346+
stacklevel=2,
347+
)
345348
kwargs["extra"] = extra
346349
if watchers is not None:
347350
kwargs["watchers"] = watchers
@@ -404,6 +407,20 @@ def normalized_uri(self) -> str | None:
404407
except ValueError:
405408
return None
406409

410+
@property
411+
def extra(self) -> dict[str, JsonValue]:
412+
# No warning here; a warning is shown when the value is set.
413+
return self._extra
414+
415+
@extra.setter
416+
def extra(self, value: dict[str, JsonValue]) -> None:
417+
warnings.warn(
418+
"Asset.extra is deprecated and will be removed in the future.",
419+
RemovedInAirflow4Warning,
420+
stacklevel=2,
421+
)
422+
self._extra = value
423+
407424

408425
class AssetRef(BaseAsset, AttrsInstance):
409426
"""

0 commit comments

Comments
 (0)