The output will have the format { {destination, partition}, data }
+ */
+class AssignDestinationsAndPartitions
+ extends PTransform, PCollection>> {
+
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+ private final DistributionMode distributionMode;
+ private final @Nullable SerializableFunction distributionFunction;
+
+ static final String DESTINATION = "destination";
+ static final String PARTITION = "partition";
+ static final String SHARD = "shard";
+ static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
+ org.apache.beam.sdk.schemas.Schema.builder()
+ .addStringField(DESTINATION)
+ .addStringField(PARTITION)
+ .addNullableField(SHARD, org.apache.beam.sdk.schemas.Schema.FieldType.INT32)
+ .build();
+
+ public AssignDestinationsAndPartitions(
+ DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
+ this(dynamicDestinations, catalogConfig, DistributionMode.HASH, null);
+ }
+
+ public AssignDestinationsAndPartitions(
+ DynamicDestinations dynamicDestinations,
+ IcebergCatalogConfig catalogConfig,
+ DistributionMode distributionMode,
+ @Nullable SerializableFunction distributionFunction) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.catalogConfig = catalogConfig;
+ this.distributionMode = distributionMode;
+ this.distributionFunction = distributionFunction;
+ }
+
+ @Override
+ public PCollection> expand(PCollection input) {
+ return input
+ .apply(
+ ParDo.of(
+ new AssignDoFn(
+ dynamicDestinations, catalogConfig, distributionMode, distributionFunction)))
+ .setCoder(
+ KvCoder.of(
+ RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema())));
+ }
+
+ @SuppressWarnings("nullness")
+ static class AssignDoFn extends DoFn> {
+ private transient @MonotonicNonNull Map partitionKeys;
+ private transient @MonotonicNonNull Map wrappers;
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+ private final DistributionMode distributionMode;
+ private final @Nullable SerializableFunction distributionFunction;
+
+ AssignDoFn(
+ DynamicDestinations dynamicDestinations,
+ IcebergCatalogConfig catalogConfig,
+ DistributionMode distributionMode,
+ @Nullable SerializableFunction distributionFunction) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.catalogConfig = catalogConfig;
+ this.distributionMode = distributionMode;
+ this.distributionFunction = distributionFunction;
+ }
+
+ @Setup
+ public void setup() {
+ this.wrappers = new HashMap<>();
+ this.partitionKeys = new HashMap<>();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element Row element,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ @Timestamp Instant timestamp,
+ OutputReceiver> out) {
+ String tableIdentifier =
+ dynamicDestinations.getTableStringIdentifier(
+ ValueInSingleWindow.of(element, timestamp, window, paneInfo));
+ Row data = dynamicDestinations.getData(element);
+
+ @Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier);
+ @Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier);
+ if (partitionKey == null || wrapper == null) {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
+ @Nullable
+ IcebergTableCreateConfig createConfig =
+ dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
+ if (createConfig != null && createConfig.getPartitionFields() != null) {
+ spec =
+ PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema());
+ } else {
+ try {
+ // see if table already exists with a spec
+ // TODO(https://github.com/apache/beam/issues/38337): improve this by periodically
+ // refreshing the table to fetch updated specs
+ spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
+ } catch (NoSuchTableException ignored) {
+ // no partition to apply
+ }
+ }
+ partitionKey = new PartitionKey(spec, schema);
+ wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
+ checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey);
+ checkStateNotNull(wrappers).put(tableIdentifier, wrapper);
+ }
+ partitionKey.partition(wrapper.wrap(data));
+ String partitionPath = partitionKey.toPath();
+
+ Integer shardId = null;
+ if (distributionMode == DistributionMode.RANGE && distributionFunction != null) {
+ shardId = distributionFunction.apply(data);
+ }
+
+ Row destAndPartition =
+ Row.withSchema(OUTPUT_SCHEMA)
+ .withFieldValue(DESTINATION, tableIdentifier)
+ .withFieldValue(PARTITION, partitionPath)
+ .withFieldValue(SHARD, shardId)
+ .build();
+ out.output(KV.of(destAndPartition, data));
+ }
+ }
+}
diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
new file mode 100644
index 000000000000..4661c1aca209
--- /dev/null
+++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A connector that reads and writes to Apache Iceberg
+ * tables.
+ *
+ *
{@link IcebergIO} is offered as a Managed transform. This class is subject to change and
+ * should not be used directly. Instead, use it like so:
+ *
+ *
Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
+ * Conversion takes place between Beam {@link Row}s and Iceberg {@link Record}s using helper methods
+ * in {@link IcebergUtils}. Below is the mapping between Beam and Iceberg types:
+ *
+ *
For an existing table, the following Beam types are supported for both {@code timestamp} and
+ * {@code timestamptz}:
+ *
+ *
+ *
{@code SqlTypes.DATETIME} --> Using a {@link java.time.LocalDateTime} object
+ *
{@code DATETIME} --> Using a {@link org.joda.time.DateTime} object
+ *
{@code INT64} --> Using a {@link Long} representing micros since EPOCH
+ *
{@code STRING} --> Using a timestamp {@link String} representation (e.g. {@code
+ * "2024-10-08T13:18:20.053+03:27"})
+ *
+ *
+ *
Note: If you expect Beam to create the Iceberg table at runtime, please provide {@code
+ * SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz}
+ * column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at
+ * face-value and create equivalent column types.
+ *
+ *
For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for
+ * Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}.
+ *
+ *
Writing to Tables
+ *
+ *
Creating Tables
+ *
+ *
If an Iceberg table does not exist at the time of writing, this connector will automatically
+ * create one with the data's schema.
+ *
+ *
Note that this is a best-effort operation that depends on the {@link Catalog} implementation.
+ * Some implementations may not support creating a table using the Iceberg API.
+ *
+ *
Dynamic Destinations
+ *
+ *
Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
+ * identifier template for the {@code table} parameter. A template should have placeholders
+ * represented as curly braces containing a record field name, e.g.: {@code
+ * "my_namespace.my_{foo}_table"}.
+ *
+ *
The sink uses simple String interpolation to determine a record's table destination. The
+ * placeholder is replaced with the record's field value. Nested fields can be specified using
+ * dot-notation (e.g. {@code "{top.middle.nested}"}).
+ *
+ *
Pre-filtering Options
+ *
+ *
Some use cases may benefit from filtering record fields right before the write operation. For
+ * example, you may want to provide meta-data to guide records to the right destination, but not
+ * necessarily write that meta-data to your table. Some light-weight filtering options are provided
+ * to accommodate such cases, allowing you to control what actually gets written (see {@code
+ * drop}, {@code keep}, {@code only}}).
+ *
+ *
Example write to dynamic destinations (pseudocode):
+ *
+ *
{@code
+ * Map config = Map.of(
+ * "table", "flights.{country}.{airport}",
+ * "catalog_properties", Map.of(...),
+ * "drop", ["country", "airport"]);
+ *
+ * JSON_ROWS = [
+ * // first record is written to table "flights.usa.RDU"
+ * "{\"country\": \"usa\"," +
+ * "\"airport\": \"RDU\"," +
+ * "\"flight_id\": \"AA356\"," +
+ * "\"destination\": \"SFO\"," +
+ * "\"meal\": \"chicken alfredo\"}",
+ * // second record is written to table "flights.qatar.HIA"
+ * "{\"country\": \"qatar\"," +
+ * "\"airport\": \"HIA\"," +
+ * "\"flight_id\": \"QR 875\"," +
+ * "\"destination\": \"DEL\"," +
+ * "\"meal\": \"shawarma\"}",
+ * ...
+ * ];
+ *
+ * // fields "country" and "airport" are dropped before
+ * // records are written to tables
+ * pipeline
+ * .apply(Create.of(JSON_ROWS))
+ * .apply(JsonToRow.withSchema(...))
+ * .apply(Managed.write(ICEBERG).withConfig(config));
+ *
+ * }
+ *
+ *
Output Snapshots
+ *
+ *
When records are written and committed to a table, a snapshot is produced. A batch pipeline
+ * will perform a single commit and create a single snapshot per table. A streaming pipeline will
+ * produce a snapshot roughly according to the configured {@code
+ * triggering_frequency_seconds}.
+ *
+ *
You can access these snapshots and perform downstream processing by fetching the {@code
+ * "snapshots"} output PCollection:
+ *
+ *
Note: This reads append-only snapshots. Full CDC is not supported yet.
+ *
+ *
The CDC streaming source (enabled with {@code streaming=true}) continuously polls the
+ * table for new snapshots, with a default interval of 60 seconds. This can be overridden with
+ * {@code poll_interval_seconds}:
+ *
+ *
+ *
+ * By default, a batch read will start reading from the earliest (oldest) table snapshot. A
+ * streaming read will start reading from the latest (most recent) snapshot. This behavior can be
+ * overridden in a few mutually exclusive ways:
+ *
+ *
+ *
Manually setting a starting strategy with {@code starting_strategy} to be {@code
+ * "earliest"} or {@code "latest"}.
+ *
Setting a starting snapshot id with {@code from_snapshot}.
+ *
Setting a starting timestamp (milliseconds) with {@code from_timestamp}.
+ *
+ *
+ * By default, a batch read will go up until the most recent table snapshot. A streaming read will
+ * continue monitoring the table for new snapshots forever. This can be overridden with one of the
+ * following options:
+ *
+ *
+ *
Setting an ending snapshot id with {@code to_snapshot}.
+ *
Setting an ending timestamp (milliseconds) with {@code to_timestamp}.
+ *
+ *
+ * Note: If {@code streaming=true} and an end point is set, the pipeline will run in
+ * streaming mode and shut down automatically after processing the final snapshot.
+ */
+@Internal
+public class IcebergIO {
+
+ public static WriteRows writeRows(IcebergCatalogConfig catalog) {
+ return new AutoValue_IcebergIO_WriteRows.Builder()
+ .setCatalogConfig(catalog)
+ .setDistributionMode(DistributionMode.HASH)
+ .setAutoSharding(false)
+ .build();
+ }
+
+ @AutoValue
+ public abstract static class WriteRows extends PTransform, IcebergWriteResult> {
+
+ abstract IcebergCatalogConfig getCatalogConfig();
+
+ abstract @Nullable TableIdentifier getTableIdentifier();
+
+ abstract @Nullable DynamicDestinations getDynamicDestinations();
+
+ abstract @Nullable Duration getTriggeringFrequency();
+
+ abstract @Nullable Integer getDirectWriteByteLimit();
+
+ abstract DistributionMode getDistributionMode();
+
+ abstract @Nullable SerializableFunction getDistributionFunction();
+
+ abstract boolean getAutoSharding();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setCatalogConfig(IcebergCatalogConfig config);
+
+ abstract Builder setTableIdentifier(TableIdentifier identifier);
+
+ abstract Builder setDynamicDestinations(DynamicDestinations destinations);
+
+ abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
+ abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
+
+ abstract Builder setDistributionMode(DistributionMode mode);
+
+ abstract Builder setDistributionFunction(SerializableFunction shardFn);
+
+ abstract Builder setAutoSharding(boolean autoSharding);
+
+ abstract WriteRows build();
+ }
+
+ public WriteRows to(TableIdentifier identifier) {
+ return toBuilder().setTableIdentifier(identifier).build();
+ }
+
+ public WriteRows to(DynamicDestinations destinations) {
+ return toBuilder().setDynamicDestinations(destinations).build();
+ }
+
+ /**
+ * Sets the frequency at which data is written to files and a new {@link
+ * org.apache.iceberg.Snapshot} is produced.
+ *
+ *
Roughly every triggeringFrequency duration, records are written to data files and appended
+ * to the respective table. Each append operation creates a new table snapshot.
+ *
+ *
Generally speaking, increasing this duration will result in fewer, larger data files and
+ * fewer snapshots.
+ *
+ *
This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
+ * pipeline).
+ */
+ public WriteRows withTriggeringFrequency(Duration triggeringFrequency) {
+ return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+ }
+
+ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
+ return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
+ }
+
+ /**
+ * Defines the distribution mode of write data prior to writing.
+ *
+ *
The default distribution mode is {@link DistributionMode#HASH}.
+ *
+ *
Comparison of Distribution Modes:
+ *
+ *
+ *
Comparison of Distribution Modes
+ *
+ *
Mode
+ *
Description
+ *
Pros
+ *
Cons
+ *
+ *
+ *
{@link DistributionMode#NONE}
+ *
No network shuffle is performed. Records are sorted locally on workers prior to writing.
+ *
Highly lightweight with zero shuffle/network overhead. Best for smaller data volumes.
+ *
Writers on different workers can write to overlapping min/max key ranges across multiple files. Relies heavily on post-fact compaction or query time merges.
+ *
+ *
+ *
{@link DistributionMode#HASH}
+ *
Data is shuffled and consolidated by partition key. All records for a partition are routed to a single worker.
Can suffer from severe data skew if a single partition contains significantly more data than others (hot partitions).
+ *
+ *
+ *
{@link DistributionMode#RANGE}
+ *
Data is shuffled based on a user-provided shard/bucket function (e.g., hashing/binning continuous keys).
+ *
Distributes writes for hot partitions across multiple workers. Eliminates skew while keeping file min/max key ranges tight and non-overlapping.
+ *
Requires providing a custom {@link SerializableFunction} mapping rows to integer shard/bucket IDs.
+ *
+ *
+ *
+ *
Recommendation Matrix (Sorting & Partitioning vs. Scale):
+ *
+ *
+ *
Recommendation Matrix
+ *
+ *
Partitioning
+ *
Sorting
+ *
Scale / Volume
+ *
Latency Priority
+ *
Recommended Mode
+ *
Operational Impact
+ *
+ *
+ *
Partitioned
+ *
Sorted
+ *
Small
+ *
Any
+ *
{@link DistributionMode#HASH}
+ *
Consolidates partition files and sorts them locally. Avoids file overlaps for small volumes.
+ *
+ *
+ *
Partitioned
+ *
Sorted
+ *
Medium / Large
+ *
Low Write Latency
+ *
{@link DistributionMode#NONE}
+ *
Eliminates shuffle overhead for maximum write speed. Results in overlapping key ranges across files, which requires downstream compaction.
+ *
+ *
+ *
Partitioned
+ *
Sorted
+ *
Medium / Large
+ *
Low Read Latency
+ *
{@link DistributionMode#HASH} with auto-sharding OR {@link DistributionMode#RANGE}
+ *
HASH with auto-sharding scales writes for hot partitions but can result in overlapping file ranges requiring query-time sort merges. RANGE sharding distributes hot partitions into sequential, non-overlapping files to optimize reads.
+ *
+ *
+ *
Partitioned
+ *
Unsorted
+ *
Small
+ *
Any
+ *
{@link DistributionMode#HASH}
+ *
Consolidates data files into single partition directories to prevent file fragmentation.
+ *
+ *
+ *
Partitioned
+ *
Unsorted
+ *
Medium / Large
+ *
Any
+ *
{@link DistributionMode#HASH} with auto-sharding
+ *
Consolidates partition files while dynamically balancing hot partition writes across parallel workers.
+ *
+ *
+ *
Unpartitioned
+ *
Sorted
+ *
Small
+ *
Any
+ *
{@link DistributionMode#NONE}
+ *
Bypasses network shuffle for fast, low-volume local sorting.
+ *
+ *
+ *
Unpartitioned
+ *
Sorted
+ *
Medium / Large
+ *
Low Write Latency
+ *
{@link DistributionMode#NONE}
+ *
Bypasses network shuffle for parallel worker writes. Requires downstream compaction to resolve overlapping file ranges.
Shards continuous keys into non-overlapping worker ranges. Eliminates single-worker bottlenecks and guarantees zero file overlap for fast queries.
+ *
+ *
+ *
Unpartitioned
+ *
Unsorted
+ *
Any
+ *
Any
+ *
{@link DistributionMode#NONE}
+ *
Direct, parallel worker writes with maximum throughput and zero network shuffle overhead.
+ *
+ *
+ *
+ *
Code Samples:
+ *
+ *
{@code
+ * // 1. Using default HASH distribution mode (Consolidates by partition key)
+ * pipeline
+ * .apply(Create.of(BEAM_ROWS))
+ * .apply(IcebergIO.writeRows(catalogConfig)
+ * .to(tableId));
+ *
+ * // 2. Using NONE distribution mode (No shuffle, local sorting only)
+ * pipeline
+ * .apply(Create.of(BEAM_ROWS))
+ * .apply(IcebergIO.writeRows(catalogConfig)
+ * .to(tableId)
+ * .withDistributionMode(DistributionMode.NONE));
+ *
+ * // 3. Using RANGE distribution mode with a custom shard/bucket function to avoid data skew
+ * pipeline
+ * .apply(Create.of(BEAM_ROWS))
+ * .apply(IcebergIO.writeRows(catalogConfig)
+ * .to(tableId)
+ * .withDistributionMode(DistributionMode.RANGE)
+ * .withDistributionFunction(row -> {
+ * // Group continuous IDs into 16 parallel, non-overlapping shards
+ * long id = row.getInt64("id");
+ * return (int) (id / 10000);
+ * }));
+ * }
+ *
+ * @param mode The distribution mode.
+ */
+ public WriteRows withDistributionMode(DistributionMode mode) {
+ return toBuilder().setDistributionMode(mode).build();
+ }
+
+ /**
+ * Sets the custom range-distribution function.
+ *
+ *
Only applicable when the distribution mode is set to {@link DistributionMode#RANGE}. The
+ * function maps a Beam {@link Row} to an Integer representing a shard/bucket ID.
+ */
+ public WriteRows withDistributionFunction(SerializableFunction shardFn) {
+ return toBuilder().setDistributionFunction(shardFn).build();
+ }
+
+ /**
+ * Enables Beam's dynamic auto-sharding when using {@link DistributionMode#HASH}.
+ *
+ *
When enabled, the pipeline uses {@link
+ * org.apache.beam.sdk.transforms.GroupIntoBatches#withShardedKey()} under the hood. The runner
+ * (such as Dataflow) dynamically monitors throughput per partition key. If a partition is
+ * extremely hot, the runner automatically splits it into parallel sub-shards distributed across
+ * multiple workers to prevent single-worker bottlenecks and out-of-memory (OOM) errors, while
+ * keeping the number of data files for cold partitions minimal.
+ *
+ *
Note that because auto-sharding distributes hot-partition data randomly across worker
+ * shards, the written data files cannot guarantee non-overlapping key ranges. Downstream
+ * queries may require read-time sort merges for overlapping file segments until an Iceberg
+ * compaction job (e.g., `rewriteDataFiles`) is executed.
+ *
+ *
Only applicable when using {@link DistributionMode#HASH}.
+ */
+ public WriteRows withAutosharding() {
+ return toBuilder().setAutoSharding(true).build();
+ }
+
+ @Override
+ public IcebergWriteResult expand(PCollection input) {
+ List> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
+ Preconditions.checkArgument(
+ 1 == allToArgs.stream().filter(Predicates.notNull()).count(),
+ "Must set exactly one of table identifier or dynamic destinations object.");
+
+ DynamicDestinations destinations = getDynamicDestinations();
+ if (destinations == null) {
+ destinations =
+ DynamicDestinations.singleTable(
+ Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
+ }
+
+ // Assign destinations before re-windowing to global in WriteToDestinations because
+ // user's dynamic destination may depend on windowing properties
+ if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) {
+ Preconditions.checkArgument(
+ IcebergUtils.isUnbounded(input),
+ "Must only provide direct write limit for unbounded pipelines.");
+ }
+
+ switch (getDistributionMode()) {
+ case NONE:
+ Preconditions.checkArgument(
+ !getAutoSharding(),
+ "Autosharding option is only available with " + "'hash' distribution mode.");
+ return input
+ .apply("Assign Table Destinations", new AssignDestinations(destinations))
+ .apply(
+ "Write Rows to Destinations",
+ new WriteToDestinations(
+ getCatalogConfig(),
+ destinations,
+ getTriggeringFrequency(),
+ getDirectWriteByteLimit()));
+ case HASH:
+ return input
+ .apply(
+ "AssignDestinationAndPartition",
+ new AssignDestinationsAndPartitions(
+ destinations,
+ getCatalogConfig(),
+ getDistributionMode(),
+ getDistributionFunction()))
+ .apply(
+ "Write Rows to Partitions",
+ new WriteToPartitions(
+ getCatalogConfig(),
+ destinations,
+ getTriggeringFrequency(),
+ getAutoSharding()));
+ case RANGE:
+ Preconditions.checkArgument(
+ getDistributionFunction() != null,
+ "Must provide a distribution function when using RANGE distribution mode.");
+ return input
+ .apply(
+ "AssignDestinationAndPartitionWithRange",
+ new AssignDestinationsAndPartitions(
+ destinations,
+ getCatalogConfig(),
+ getDistributionMode(),
+ getDistributionFunction()))
+ .apply(
+ "Write Rows to Partitions",
+ new WriteToPartitions(
+ getCatalogConfig(),
+ destinations,
+ getTriggeringFrequency(),
+ getAutoSharding()));
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported distribution mode: " + getDistributionMode());
+ }
+ }
+ }
+
+ public static ReadRows readRows(IcebergCatalogConfig catalogConfig) {
+ return new AutoValue_IcebergIO_ReadRows.Builder()
+ .setCatalogConfig(catalogConfig)
+ .setUseCdc(false)
+ .build();
+ }
+
+ @AutoValue
+ public abstract static class ReadRows extends PTransform> {
+ public enum StartingStrategy {
+ EARLIEST,
+ LATEST
+ }
+
+ abstract IcebergCatalogConfig getCatalogConfig();
+
+ abstract @Nullable TableIdentifier getTableIdentifier();
+
+ abstract boolean getUseCdc();
+
+ abstract @Nullable Long getFromSnapshot();
+
+ abstract @Nullable Long getToSnapshot();
+
+ abstract @Nullable Long getFromTimestamp();
+
+ abstract @Nullable Long getToTimestamp();
+
+ abstract @Nullable StartingStrategy getStartingStrategy();
+
+ abstract @Nullable Boolean getStreaming();
+
+ abstract @Nullable Duration getPollInterval();
+
+ abstract @Nullable List getKeep();
+
+ abstract @Nullable List getDrop();
+
+ abstract @Nullable String getFilter();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setCatalogConfig(IcebergCatalogConfig config);
+
+ abstract Builder setTableIdentifier(TableIdentifier identifier);
+
+ abstract Builder setUseCdc(boolean useCdc);
+
+ abstract Builder setFromSnapshot(@Nullable Long fromSnapshot);
+
+ abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+ abstract Builder setFromTimestamp(@Nullable Long fromTimestamp);
+
+ abstract Builder setToTimestamp(@Nullable Long toTimestamp);
+
+ abstract Builder setStartingStrategy(@Nullable StartingStrategy strategy);
+
+ abstract Builder setStreaming(@Nullable Boolean streaming);
+
+ abstract Builder setPollInterval(@Nullable Duration triggeringFrequency);
+
+ abstract Builder setKeep(@Nullable List fields);
+
+ abstract Builder setDrop(@Nullable List fields);
+
+ abstract Builder setFilter(@Nullable String filter);
+
+ abstract ReadRows build();
+ }
+
+ public ReadRows withCdc() {
+ return toBuilder().setUseCdc(true).build();
+ }
+
+ public ReadRows from(TableIdentifier tableIdentifier) {
+ return toBuilder().setTableIdentifier(tableIdentifier).build();
+ }
+
+ public ReadRows fromSnapshot(@Nullable Long fromSnapshot) {
+ return toBuilder().setFromSnapshot(fromSnapshot).build();
+ }
+
+ public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+ return toBuilder().setToSnapshot(toSnapshot).build();
+ }
+
+ public ReadRows fromTimestamp(@Nullable Long fromTimestamp) {
+ return toBuilder().setFromTimestamp(fromTimestamp).build();
+ }
+
+ public ReadRows toTimestamp(@Nullable Long toTimestamp) {
+ return toBuilder().setToTimestamp(toTimestamp).build();
+ }
+
+ public ReadRows withPollInterval(Duration pollInterval) {
+ return toBuilder().setPollInterval(pollInterval).build();
+ }
+
+ public ReadRows streaming(@Nullable Boolean streaming) {
+ return toBuilder().setStreaming(streaming).build();
+ }
+
+ public ReadRows withStartingStrategy(@Nullable StartingStrategy strategy) {
+ return toBuilder().setStartingStrategy(strategy).build();
+ }
+
+ public ReadRows keeping(@Nullable List keep) {
+ return toBuilder().setKeep(keep).build();
+ }
+
+ public ReadRows dropping(@Nullable List drop) {
+ return toBuilder().setDrop(drop).build();
+ }
+
+ public ReadRows withFilter(@Nullable String filter) {
+ return toBuilder().setFilter(filter).build();
+ }
+
+ @Override
+ public PCollection expand(PBegin input) {
+ TableIdentifier tableId =
+ checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");
+
+ Table table = getCatalogConfig().catalog().loadTable(tableId);
+
+ IcebergScanConfig scanConfig =
+ IcebergScanConfig.builder()
+ .setCatalogConfig(getCatalogConfig())
+ .setScanType(IcebergScanConfig.ScanType.TABLE)
+ .setTableIdentifier(tableId)
+ .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+ .setFromSnapshotInclusive(getFromSnapshot())
+ .setToSnapshot(getToSnapshot())
+ .setFromTimestamp(getFromTimestamp())
+ .setToTimestamp(getToTimestamp())
+ .setStartingStrategy(getStartingStrategy())
+ .setStreaming(getStreaming())
+ .setPollInterval(getPollInterval())
+ .setUseCdc(getUseCdc())
+ .setKeepFields(getKeep())
+ .setDropFields(getDrop())
+ .setFilterString(getFilter())
+ .build();
+ scanConfig.validate(table);
+
+ PTransform> source =
+ getUseCdc()
+ ? new IncrementalScanSource(scanConfig)
+ : Read.from(new ScanSource(scanConfig));
+
+ return input.apply(source);
+ }
+ }
+}
diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java
new file mode 100644
index 000000000000..6efc1bbe2eec
--- /dev/null
+++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.joda.time.ReadableInstant;
+
+/**
+ * A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link
+ * BufferedExternalSorter} to spill to local disk when elements exceed memory limit.
+ */
+class IcebergRowSorter implements Serializable {
+
+ public static Iterable sortRows(
+ Iterable rows,
+ SortOrder sortOrder,
+ Schema icebergSchema,
+ org.apache.beam.sdk.schemas.Schema beamSchema) {
+
+ if (sortOrder == null || !sortOrder.isSorted()) {
+ return rows;
+ }
+
+ BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options();
+ BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions);
+ RowCoder rowCoder = RowCoder.of(beamSchema);
+
+ List fields = sortOrder.fields();
+ String[] columnNames = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ columnNames[i] = icebergSchema.findColumnName(fields.get(i).sourceId());
+ }
+
+ // Create reusable ByteArrayOutputStreams for key and value encoding
+ ByteArrayOutputStream keyBaos = new ByteArrayOutputStream();
+ ByteArrayOutputStream valBaos = new ByteArrayOutputStream();
+
+ try {
+ for (Row row : rows) {
+ keyBaos.reset();
+ valBaos.reset();
+ encodeSortKey(row, sortOrder, columnNames, keyBaos, icebergSchema, beamSchema);
+ byte[] keyBytes = keyBaos.toByteArray();
+
+ rowCoder.encode(row, valBaos);
+ byte[] valBytes = valBaos.toByteArray();
+ sorter.add(KV.of(keyBytes, valBytes));
+ }
+
+ Iterable> sortedKVs = sorter.sort();
+ return new Iterable() {
+ @Override
+ public Iterator iterator() {
+ final Iterator> it = sortedKVs.iterator();
+ return new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public Row next() {
+ KV next = it.next();
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue());
+ return rowCoder.decode(bais);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to decode Row during sorting", e);
+ }
+ }
+ };
+ }
+ };
+
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to sort rows with external sorter", e);
+ }
+ }
+
+ @SuppressWarnings("nullness")
+ public static void encodeSortKey(
+ Row row,
+ SortOrder sortOrder,
+ String[] columnNames,
+ ByteArrayOutputStream baos,
+ Schema icebergSchema,
+ org.apache.beam.sdk.schemas.Schema beamSchema)
+ throws IOException {
+
+ List fields = sortOrder.fields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ SortField field = fields.get(i);
+ String colName = columnNames[i];
+ Object val = row.getValue(colName);
+
+ if (!field.transform().isIdentity()) {
+ Object icebergVal =
+ IcebergUtils.beamValueToIcebergValue(icebergSchema.findType(field.sourceId()), val);
+ if (icebergVal != null) {
+ val = field.transform().apply(icebergVal);
+ } else {
+ val = null;
+ }
+ }
+
+ boolean isNull = (val == null);
+ boolean isDesc = (field.direction() == SortDirection.DESC);
+ boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST);
+
+ // Determine correct header prefix to fulfill the NullOrder contracts
+ byte prefixByte;
+ if (isNull) {
+ prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF;
+ } else {
+ prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00;
+ }
+
+ baos.write(prefixByte);
+
+ if (!isNull) {
+ writeValue(val, baos, isDesc);
+ }
+ }
+ }
+
+ private static void writeInt(int v, ByteArrayOutputStream baos, boolean invert) {
+ byte b3 = (byte) (v >>> 24);
+ byte b2 = (byte) (v >>> 16);
+ byte b1 = (byte) (v >>> 8);
+ byte b0 = (byte) v;
+ if (invert) {
+ baos.write(~b3);
+ baos.write(~b2);
+ baos.write(~b1);
+ baos.write(~b0);
+ } else {
+ baos.write(b3);
+ baos.write(b2);
+ baos.write(b1);
+ baos.write(b0);
+ }
+ }
+
+ private static void writeLong(long v, ByteArrayOutputStream baos, boolean invert) {
+ byte b7 = (byte) (v >>> 56);
+ byte b6 = (byte) (v >>> 48);
+ byte b5 = (byte) (v >>> 40);
+ byte b4 = (byte) (v >>> 32);
+ byte b3 = (byte) (v >>> 24);
+ byte b2 = (byte) (v >>> 16);
+ byte b1 = (byte) (v >>> 8);
+ byte b0 = (byte) v;
+ if (invert) {
+ baos.write(~b7);
+ baos.write(~b6);
+ baos.write(~b5);
+ baos.write(~b4);
+ baos.write(~b3);
+ baos.write(~b2);
+ baos.write(~b1);
+ baos.write(~b0);
+ } else {
+ baos.write(b7);
+ baos.write(b6);
+ baos.write(b5);
+ baos.write(b4);
+ baos.write(b3);
+ baos.write(b2);
+ baos.write(b1);
+ baos.write(b0);
+ }
+ }
+
+ @SuppressWarnings("JavaUtilDate")
+ private static void writeValue(Object val, ByteArrayOutputStream baos, boolean invert)
+ throws IOException {
+ if (val instanceof String) {
+ writeString((String) val, baos, invert);
+ } else if (val instanceof Integer) {
+ int v = (Integer) val;
+ writeInt(v ^ Integer.MIN_VALUE, baos, invert);
+ } else if (val instanceof Long) {
+ long v = (Long) val;
+ writeLong(v ^ Long.MIN_VALUE, baos, invert);
+ } else if (val instanceof Float) {
+ int bits = Float.floatToIntBits((Float) val);
+ bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits;
+ writeInt(bits, baos, invert);
+ } else if (val instanceof Double) {
+ long bits = Double.doubleToLongBits((Double) val);
+ bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits;
+ writeLong(bits, baos, invert);
+ } else if (val instanceof Boolean) {
+ byte b = ((Boolean) val) ? (byte) 0x01 : (byte) 0x00;
+ baos.write(invert ? ~b : b);
+ } else if (val instanceof byte[]) {
+ writeByteArray((byte[]) val, baos, invert);
+ } else if (val instanceof ByteBuffer) {
+ writeByteArray(((ByteBuffer) val).array(), baos, invert);
+ } else if (val instanceof ReadableInstant) {
+ long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE;
+ writeLong(enc, baos, invert);
+ } else if (val instanceof Instant) {
+ long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE;
+ writeLong(enc, baos, invert);
+ } else if (val instanceof Date) {
+ long enc = ((Date) val).getTime() ^ Long.MIN_VALUE;
+ writeLong(enc, baos, invert);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported type for sorting: " + val.getClass().getName());
+ }
+ }
+
+ private static void writeString(String s, ByteArrayOutputStream baos, boolean invert)
+ throws IOException {
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+ writeByteArray(bytes, baos, invert);
+ }
+
+ private static void writeByteArray(byte[] bytes, ByteArrayOutputStream baos, boolean invert) {
+ for (byte b : bytes) {
+ if (b == 0x00) {
+ baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
+ baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
+ } else if (b == 0x01) {
+ baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
+ baos.write(invert ? ~(byte) 0x02 : (byte) 0x02);
+ } else {
+ baos.write(invert ? ~b : b);
+ }
+ }
+ baos.write(invert ? ~(byte) 0x00 : (byte) 0x00);
+ }
+}
diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
new file mode 100644
index 000000000000..7f48a0d0128c
--- /dev/null
+++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
+public class IcebergUtils {
+ private IcebergUtils() {}
+
+ private static final Map BEAM_TYPES_TO_ICEBERG_TYPES =
+ ImmutableMap.builder()
+ .put(Schema.TypeName.BOOLEAN, Types.BooleanType.get())
+ .put(Schema.TypeName.INT32, Types.IntegerType.get())
+ .put(Schema.TypeName.INT64, Types.LongType.get())
+ .put(Schema.TypeName.FLOAT, Types.FloatType.get())
+ .put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
+ .put(Schema.TypeName.STRING, Types.StringType.get())
+ .put(Schema.TypeName.BYTES, Types.BinaryType.get())
+ .put(Schema.TypeName.DATETIME, Types.TimestampType.withZone())
+ .build();
+
+ private static final Map BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES =
+ ImmutableMap.builder()
+ .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
+ .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
+ .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
+ .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
+ .put(MicrosInstant.IDENTIFIER, Types.TimestampType.withZone())
+ .build();
+
+ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return Schema.FieldType.BOOLEAN;
+ case INTEGER:
+ return Schema.FieldType.INT32;
+ case LONG:
+ return Schema.FieldType.INT64;
+ case FLOAT:
+ return Schema.FieldType.FLOAT;
+ case DOUBLE:
+ return Schema.FieldType.DOUBLE;
+ case DATE:
+ return Schema.FieldType.logicalType(SqlTypes.DATE);
+ case TIME:
+ return Schema.FieldType.logicalType(SqlTypes.TIME);
+ case TIMESTAMP:
+ Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType();
+ if (ts.shouldAdjustToUTC()) {
+ return Schema.FieldType.DATETIME;
+ }
+ return Schema.FieldType.logicalType(SqlTypes.DATETIME);
+ case STRING:
+ return Schema.FieldType.STRING;
+ case UUID:
+ case BINARY:
+ return Schema.FieldType.BYTES;
+ case FIXED:
+ case DECIMAL:
+ return Schema.FieldType.DECIMAL;
+ case STRUCT:
+ return Schema.FieldType.row(icebergStructTypeToBeamSchema(type.asStructType()));
+ case LIST:
+ return Schema.FieldType.array(icebergTypeToBeamFieldType(type.asListType().elementType()));
+ case MAP:
+ return Schema.FieldType.map(
+ icebergTypeToBeamFieldType(type.asMapType().keyType()),
+ icebergTypeToBeamFieldType(type.asMapType().valueType()));
+ default:
+ throw new RuntimeException("Unrecognized Iceberg Type: " + type.typeId());
+ }
+ }
+
+ private static Schema.Field icebergFieldToBeamField(final Types.NestedField field) {
+ return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type()))
+ .withNullable(field.isOptional());
+ }
+
+ /** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link Schema}. */
+ public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema schema) {
+ Schema.Builder builder = Schema.builder();
+ for (Types.NestedField f : schema.columns()) {
+ builder.addField(icebergFieldToBeamField(f));
+ }
+ return builder.build();
+ }
+
+ private static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) {
+ Schema.Builder builder = Schema.builder();
+ for (Types.NestedField f : struct.fields()) {
+ builder.addField(icebergFieldToBeamField(f));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Represents a {@link Type} and the most recent field ID used to build it.
+ *
+ *
Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link
+ * org.apache.iceberg.types.Type.NestedType}'s components (e.g. {@link Types.ListType}'s
+ * collection type, {@link Types.MapType}'s key type and value type, and {@link
+ * Types.StructType}'s nested fields). The {@code maxId} in this object represents the most recent
+ * ID used after building this type. This helps signal that the next {@link
+ * org.apache.iceberg.types.Type.NestedType} we construct should have an ID greater than this one.
+ */
+ @VisibleForTesting
+ static class TypeAndMaxId {
+ int maxId;
+ Type type;
+
+ TypeAndMaxId(int id, Type object) {
+ this.maxId = id;
+ this.type = object;
+ }
+ }
+
+ /**
+ * Takes a Beam {@link Schema.FieldType} and an index intended as a starting point for Iceberg
+ * {@link org.apache.iceberg.types.Type.NestedType}s. Returns an Iceberg {@link Type} and the
+ * maximum index after building that type.
+ *
+ *
Returns this information in an {@link TypeAndMaxId} object.
+ */
+ @VisibleForTesting
+ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
+ Schema.FieldType beamType, int nestedFieldId) {
+ if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+ // we don't use nested field ID for primitive types. decrement it so the caller can use it for
+ // other types.
+ return new TypeAndMaxId(
+ --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+ } else if (beamType.getTypeName().isLogicalType()) {
+ Schema.LogicalType, ?> logicalType = checkArgumentNotNull(beamType.getLogicalType());
+ if (logicalType instanceof FixedPrecisionNumeric) {
+ Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument());
+ Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision"));
+ Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale"));
+ return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale));
+ }
+ if (logicalType instanceof PassThroughLogicalType) {
+ return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), nestedFieldId);
+ }
+ String logicalTypeIdentifier = logicalType.getIdentifier();
+ @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
+ if (type == null) {
+ throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);
+ }
+ return new TypeAndMaxId(--nestedFieldId, type);
+ } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
+ Schema.FieldType beamCollectionType =
+ Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+
+ // nestedFieldId is reserved for the list's collection type.
+ // we increment here because further nested fields should use unique ID's
+ TypeAndMaxId listInfo =
+ beamFieldTypeToIcebergFieldType(beamCollectionType, nestedFieldId + 1);
+ Type icebergCollectionType = listInfo.type;
+
+ boolean elementTypeIsNullable =
+ Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+ Type listType =
+ elementTypeIsNullable
+ ? Types.ListType.ofOptional(nestedFieldId, icebergCollectionType)
+ : Types.ListType.ofRequired(nestedFieldId, icebergCollectionType);
+
+ return new TypeAndMaxId(listInfo.maxId, listType);
+ } else if (beamType.getTypeName().isMapType()) { // MAP
+ // key and value IDs need to be unique
+ int keyId = nestedFieldId;
+ int valueId = keyId + 1;
+
+ // nested field IDs should be unique
+ nestedFieldId = valueId + 1;
+ Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+ TypeAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(beamKeyType, nestedFieldId);
+ Type icebergKeyType = keyInfo.type;
+
+ nestedFieldId = keyInfo.maxId + 1;
+ Schema.FieldType beamValueType =
+ Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+ TypeAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(beamValueType, nestedFieldId);
+ Type icebergValueType = valueInfo.type;
+
+ Type mapType =
+ beamValueType.getNullable()
+ ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType)
+ : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType);
+
+ return new TypeAndMaxId(valueInfo.maxId, mapType);
+ } else if (beamType.getTypeName().isCompositeType()) { // ROW
+ // Nested field IDs need to be unique from the field that contains this StructType
+ Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+ List nestedFields = new ArrayList<>(nestedSchema.getFieldCount());
+
+ int icebergFieldId = nestedFieldId;
+ nestedFieldId = icebergFieldId + nestedSchema.getFieldCount();
+ for (Schema.Field beamField : nestedSchema.getFields()) {
+ TypeAndMaxId typeAndMaxId =
+ beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId);
+ Types.NestedField icebergField =
+ Types.NestedField.of(
+ icebergFieldId++,
+ beamField.getType().getNullable(),
+ beamField.getName(),
+ typeAndMaxId.type);
+
+ nestedFields.add(icebergField);
+ nestedFieldId = typeAndMaxId.maxId + 1;
+ }
+
+ Type structType = Types.StructType.of(nestedFields);
+
+ return new TypeAndMaxId(nestedFieldId - 1, structType);
+ }
+
+ return new TypeAndMaxId(nestedFieldId, Types.StringType.get());
+ }
+
+ /**
+ * Converts a Beam {@link Schema} to an Iceberg {@link org.apache.iceberg.Schema}.
+ *
+ *
The following unsupported Beam types will be defaulted to {@link Types.StringType}:
+ *
{@link Schema.TypeName.DECIMAL}
+ */
+ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
+ List fields = new ArrayList<>(schema.getFieldCount());
+ int nestedFieldId = schema.getFieldCount() + 1;
+ int icebergFieldId = 1;
+ for (Schema.Field beamField : schema.getFields()) {
+ TypeAndMaxId typeAndMaxId =
+ beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId);
+ Types.NestedField icebergField =
+ Types.NestedField.of(
+ icebergFieldId++,
+ beamField.getType().getNullable(),
+ beamField.getName(),
+ typeAndMaxId.type);
+
+ fields.add(icebergField);
+ nestedFieldId = typeAndMaxId.maxId + 1;
+ }
+ return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()]));
+ }
+
+ /**
+ * Converts a Beam field value to its Iceberg-compatible equivalent based on the Iceberg {@link
+ * Type}.
+ */
+ public static @Nullable Object beamValueToIcebergValue(Type type, @Nullable Object value) {
+ if (value == null) {
+ return null;
+ }
+ switch (type.typeId()) {
+ case BOOLEAN:
+ case INTEGER:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME:
+ case DECIMAL:
+ case STRING:
+ return value;
+ case TIMESTAMP:
+ Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType();
+ return getIcebergTimestampValue(value, ts.shouldAdjustToUTC());
+ case UUID:
+ if (value instanceof byte[]) {
+ return UUID.nameUUIDFromBytes((byte[]) value);
+ }
+ return value;
+ case BINARY:
+ if (value instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) value);
+ }
+ return value;
+ case FIXED:
+ throw new UnsupportedOperationException("Fixed-precision fields are not yet supported.");
+ default:
+ return value;
+ }
+ }
+
+ /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
+ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) {
+ if (row.getSchema().getFieldCount() != schema.columns().size()) {
+ throw new IllegalStateException(
+ String.format(
+ "Beam Row schema and Iceberg schema have different sizes.%n\tBeam Row columns: %s%n\tIceberg schema columns: %s",
+ row.getSchema().getFieldNames(),
+ schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList())));
+ }
+ return copyRowIntoRecord(GenericRecord.create(schema), row);
+ }
+
+ private static Record copyRowIntoRecord(Record baseRecord, Row value) {
+ Record rec = baseRecord.copy();
+ for (Types.NestedField f : rec.struct().fields()) {
+ copyFieldIntoRecord(rec, f, value);
+ }
+ return rec;
+ }
+
+ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row value) {
+ String name = field.name();
+ switch (field.type().typeId()) {
+ case BOOLEAN:
+ Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case INTEGER:
+ Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case LONG:
+ Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case FLOAT:
+ Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case DOUBLE:
+ Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case DATE:
+ Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class))
+ .ifPresent(v -> rec.setField(name, v));
+ break;
+ case TIME:
+ Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class))
+ .ifPresent(v -> rec.setField(name, v));
+ break;
+ case TIMESTAMP:
+ Object val = value.getValue(name);
+ if (val == null) {
+ break;
+ }
+ Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType();
+ rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC()));
+ break;
+ case STRING:
+ Object strVal = value.getValue(name);
+ if (strVal != null) {
+ rec.setField(name, strVal.toString());
+ }
+ break;
+ case UUID:
+ Optional.ofNullable(value.getBytes(name))
+ .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v)));
+ break;
+ case FIXED:
+ throw new UnsupportedOperationException("Fixed-precision fields are not yet supported.");
+ case BINARY:
+ Optional.ofNullable(value.getBytes(name))
+ .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v)));
+ break;
+ case DECIMAL:
+ Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> rec.setField(name, v));
+ break;
+ case STRUCT:
+ Optional.ofNullable(value.getRow(name))
+ .ifPresent(
+ row ->
+ rec.setField(
+ name,
+ copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
+ break;
+ case LIST:
+ Iterable<@NonNull ?> icebergList = value.getIterable(name);
+ Type collectionType = ((Types.ListType) field.type()).elementType();
+
+ if (collectionType.isStructType() && icebergList != null) {
+ org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema();
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (Row v : (Iterable) icebergList) {
+ builder.add(beamRowToIcebergRecord(innerSchema, v));
+ }
+ icebergList = builder.build();
+ }
+ Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list));
+ break;
+ case MAP:
+ Map, ?> icebergMap = value.getMap(name);
+ Type valueType = ((Types.MapType) field.type()).valueType();
+ // recurse on struct types
+ if (valueType.isStructType() && icebergMap != null) {
+ org.apache.iceberg.Schema innerSchema = valueType.asStructType().asSchema();
+
+ ImmutableMap.Builder