diff --git a/scratch/iceberg-scale-test/build.gradle b/scratch/iceberg-scale-test/build.gradle new file mode 100644 index 000000000000..0ab76e165b64 --- /dev/null +++ b/scratch/iceberg-scale-test/build.gradle @@ -0,0 +1,43 @@ +plugins { + id 'java' + id 'application' +} + +repositories { + mavenLocal() + mavenCentral() + maven { + url "https://repository.apache.org/snapshots/" + } +} + +dependencies { + implementation "org.apache.beam:beam-sdks-java-core:2.74.0-SNAPSHOT" + implementation "org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.74.0-SNAPSHOT" + implementation "org.apache.beam:beam-sdks-java-io-iceberg:2.74.0-SNAPSHOT" + implementation "org.apache.beam:beam-sdks-java-extensions-sorter:2.74.0-SNAPSHOT" + implementation "org.apache.beam:beam-runners-google-cloud-dataflow-java:2.74.0-SNAPSHOT" + + implementation "org.apache.iceberg:iceberg-core:1.4.3" + implementation "org.apache.iceberg:iceberg-api:1.4.3" + implementation "org.apache.iceberg:iceberg-data:1.4.3" + implementation "org.apache.iceberg:iceberg-gcp:1.4.3" + implementation "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.26" + implementation "org.apache.hadoop:hadoop-client:3.4.2" + implementation "org.apache.hadoop:hadoop-common:3.4.2" + + implementation "org.slf4j:slf4j-api:1.7.30" + implementation "org.slf4j:slf4j-jdk14:1.7.30" + + annotationProcessor "com.google.auto.value:auto-value:1.9" + compileOnly "com.google.auto.value:auto-value-annotations:1.9" +} + +application { + mainClass = 'org.apache.beam.sdk.io.iceberg.test.IcebergBigQueryScaleTest' +} + +java { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} diff --git a/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.jar b/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000000..7f93135c49b7 Binary files /dev/null and b/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.jar differ diff --git a/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.properties b/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000000..d4081da476bb --- /dev/null +++ b/scratch/iceberg-scale-test/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/scratch/iceberg-scale-test/gradlew b/scratch/iceberg-scale-test/gradlew new file mode 100755 index 000000000000..1aa94a426907 --- /dev/null +++ b/scratch/iceberg-scale-test/gradlew @@ -0,0 +1,249 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/scratch/iceberg-scale-test/settings.gradle b/scratch/iceberg-scale-test/settings.gradle new file mode 100644 index 000000000000..fdd018dc9055 --- /dev/null +++ b/scratch/iceberg-scale-test/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'iceberg-scale-test' diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java new file mode 100644 index 000000000000..4cea9312f6fb --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * Assigns destination metadata for each input record. + * + *

The output will have the format { {destination, partition}, data } + */ +class AssignDestinationsAndPartitions + extends PTransform, PCollection>> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; + + static final String DESTINATION = "destination"; + static final String PARTITION = "partition"; + static final String SHARD = "shard"; + static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField(DESTINATION) + .addStringField(PARTITION) + .addNullableField(SHARD, org.apache.beam.sdk.schemas.Schema.FieldType.INT32) + .build(); + + public AssignDestinationsAndPartitions( + DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + this(dynamicDestinations, catalogConfig, DistributionMode.HASH, null); + } + + public AssignDestinationsAndPartitions( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; + } + + @Override + public PCollection> expand(PCollection input) { + return input + .apply( + ParDo.of( + new AssignDoFn( + dynamicDestinations, catalogConfig, distributionMode, distributionFunction))) + .setCoder( + KvCoder.of( + RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema()))); + } + + @SuppressWarnings("nullness") + static class AssignDoFn extends DoFn> { + private transient @MonotonicNonNull Map partitionKeys; + private transient @MonotonicNonNull Map wrappers; + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; + + AssignDoFn( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; + } + + @Setup + public void setup() { + this.wrappers = new HashMap<>(); + this.partitionKeys = new HashMap<>(); + } + + @ProcessElement + public void processElement( + @Element Row element, + BoundedWindow window, + PaneInfo paneInfo, + @Timestamp Instant timestamp, + OutputReceiver> out) { + String tableIdentifier = + dynamicDestinations.getTableStringIdentifier( + ValueInSingleWindow.of(element, timestamp, window, paneInfo)); + Row data = dynamicDestinations.getData(element); + + @Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier); + @Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier); + if (partitionKey == null || wrapper == null) { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema()); + @Nullable + IcebergTableCreateConfig createConfig = + dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig(); + if (createConfig != null && createConfig.getPartitionFields() != null) { + spec = + PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema()); + } else { + try { + // see if table already exists with a spec + // TODO(https://github.com/apache/beam/issues/38337): improve this by periodically + // refreshing the table to fetch updated specs + spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec(); + } catch (NoSuchTableException ignored) { + // no partition to apply + } + } + partitionKey = new PartitionKey(spec, schema); + wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); + checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey); + checkStateNotNull(wrappers).put(tableIdentifier, wrapper); + } + partitionKey.partition(wrapper.wrap(data)); + String partitionPath = partitionKey.toPath(); + + Integer shardId = null; + if (distributionMode == DistributionMode.RANGE && distributionFunction != null) { + shardId = distributionFunction.apply(data); + } + + Row destAndPartition = + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue(DESTINATION, tableIdentifier) + .withFieldValue(PARTITION, partitionPath) + .withFieldValue(SHARD, shardId) + .build(); + out.output(KV.of(destAndPartition, data)); + } + } +} diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java new file mode 100644 index 000000000000..4661c1aca209 --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * A connector that reads and writes to Apache Iceberg + * tables. + * + *

{@link IcebergIO} is offered as a Managed transform. This class is subject to change and + * should not be used directly. Instead, use it like so: + * + *

{@code
+ * Map config = Map.of(
+ *         "table", table,
+ *         "catalog_name", name,
+ *         "catalog_properties", Map.of(
+ *                 "warehouse", warehouse_path,
+ *                 "catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
+ *         "config_properties", Map.of(
+ *                 "hive.metastore.uris", metastore_uri));
+ *
+ *
+ * ====== WRITE ======
+ * pipeline
+ *     .apply(Create.of(BEAM_ROWS))
+ *     .apply(Managed.write(ICEBERG).withConfig(config));
+ *
+ *
+ * ====== READ ======
+ * pipeline
+ *     .apply(Managed.read(ICEBERG).withConfig(config))
+ *     .getSinglePCollection()
+ *     .apply(ParDo.of(...));
+ *
+ *
+ * ====== READ CDC ======
+ * pipeline
+ *     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ *     .getSinglePCollection()
+ *     .apply(ParDo.of(...));
+ * }
+ * + * Look for more detailed examples below. + * + *

Configuration Options

+ * + * Please check the Managed IO + * configuration page + * + *

Beam Rows

+ * + *

Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s. + * Conversion takes place between Beam {@link Row}s and Iceberg {@link Record}s using helper methods + * in {@link IcebergUtils}. Below is the mapping between Beam and Iceberg types: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Beam {@link Schema.FieldType} Iceberg {@link Type} + *
BYTES BINARY
BOOLEAN BOOLEAN
STRING STRING
INT32 INTEGER
INT64 LONG
DECIMAL STRING
FLOAT FLOAT
DOUBLE DOUBLE
SqlTypes.DATETIME TIMESTAMP
DATETIME TIMESTAMPTZ
SqlTypes.DATE DATE
SqlTypes.TIME TIME
ITERABLE LIST
ARRAY LIST
MAP MAP
ROW STRUCT
+ * + *

Note: {@code SqlTypes} are Beam logical types. + * + *

Note on timestamps

+ * + *

For an existing table, the following Beam types are supported for both {@code timestamp} and + * {@code timestamptz}: + * + *

+ * + *

Note: If you expect Beam to create the Iceberg table at runtime, please provide {@code + * SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz} + * column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at + * face-value and create equivalent column types. + * + *

For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for + * Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}. + * + *

Writing to Tables

+ * + *

Creating Tables

+ * + *

If an Iceberg table does not exist at the time of writing, this connector will automatically + * create one with the data's schema. + * + *

Note that this is a best-effort operation that depends on the {@link Catalog} implementation. + * Some implementations may not support creating a table using the Iceberg API. + * + *

Dynamic Destinations

+ * + *

Managed Iceberg supports writing to dynamic destinations. To do so, please provide an + * identifier template for the {@code table} parameter. A template should have placeholders + * represented as curly braces containing a record field name, e.g.: {@code + * "my_namespace.my_{foo}_table"}. + * + *

The sink uses simple String interpolation to determine a record's table destination. The + * placeholder is replaced with the record's field value. Nested fields can be specified using + * dot-notation (e.g. {@code "{top.middle.nested}"}). + * + *

Pre-filtering Options

+ * + *

Some use cases may benefit from filtering record fields right before the write operation. For + * example, you may want to provide meta-data to guide records to the right destination, but not + * necessarily write that meta-data to your table. Some light-weight filtering options are provided + * to accommodate such cases, allowing you to control what actually gets written (see {@code + * drop}, {@code keep}, {@code only}}). + * + *

Example write to dynamic destinations (pseudocode): + * + *

{@code
+ * Map config = Map.of(
+ *         "table", "flights.{country}.{airport}",
+ *         "catalog_properties", Map.of(...),
+ *         "drop", ["country", "airport"]);
+ *
+ * JSON_ROWS = [
+ *       // first record is written to table "flights.usa.RDU"
+ *         "{\"country\": \"usa\"," +
+ *          "\"airport\": \"RDU\"," +
+ *          "\"flight_id\": \"AA356\"," +
+ *          "\"destination\": \"SFO\"," +
+ *          "\"meal\": \"chicken alfredo\"}",
+ *       // second record is written to table "flights.qatar.HIA"
+ *         "{\"country\": \"qatar\"," +
+ *          "\"airport\": \"HIA\"," +
+ *          "\"flight_id\": \"QR 875\"," +
+ *          "\"destination\": \"DEL\"," +
+ *          "\"meal\": \"shawarma\"}",
+ *          ...
+ *          ];
+ *
+ * // fields "country" and "airport" are dropped before
+ * // records are written to tables
+ * pipeline
+ *     .apply(Create.of(JSON_ROWS))
+ *     .apply(JsonToRow.withSchema(...))
+ *     .apply(Managed.write(ICEBERG).withConfig(config));
+ *
+ * }
+ * + *

Output Snapshots

+ * + *

When records are written and committed to a table, a snapshot is produced. A batch pipeline + * will perform a single commit and create a single snapshot per table. A streaming pipeline will + * produce a snapshot roughly according to the configured {@code + * triggering_frequency_seconds}. + * + *

You can access these snapshots and perform downstream processing by fetching the {@code + * "snapshots"} output PCollection: + * + *

{@code
+ * pipeline
+ *     .apply(Create.of(BEAM_ROWS))
+ *     .apply(Managed.write(ICEBERG).withConfig(config))
+ *     .get("snapshots")
+ *     .apply(ParDo.of(new DoFn {...});
+ * }
+ * + * Each Snapshot is represented as a Beam Row, with the following Schema: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Field Type Description
{@code table} {@code str} Table identifier.
{@code manifest_list_location} {@code str} Location of the snapshot's manifest list.
{@code operation} {@code str} Name of the operation that produced the snapshot.
{@code parent_id} {@code long} The snapshot's parent ID.
{@code schema_id} {@code int} The id of the schema used when the snapshot was created.
{@code summary} {@code map} A string map of summary data.
{@code timestamp_millis} {@code long} The snapshot's timestamp in milliseconds.
+ * + *
+ *
+ * + *

Reading from Tables

+ * + * With the following configuration, + * + *
{@code
+ * Map config = Map.of(
+ *         "table", table,
+ *         "catalog_name", name,
+ *         "catalog_properties", Map.of(...),
+ *         "config_properties", Map.of(...));
+ * }
+ * + * Example of a simple batch read: + * + *
{@code
+ * PCollection rows = pipeline
+ *     .apply(Managed.read(ICEBERG).withConfig(config))
+ *     .getSinglePCollection();
+ * }
+ * + * Example of a simple CDC streaming read: + * + *
{@code
+ * PCollection rows = pipeline
+ *     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ *     .getSinglePCollection();
+ * }
+ * + *

Note: This reads append-only snapshots. Full CDC is not supported yet. + * + *

The CDC streaming source (enabled with {@code streaming=true}) continuously polls the + * table for new snapshots, with a default interval of 60 seconds. This can be overridden with + * {@code poll_interval_seconds}: + * + *

{@code
+ * config.put("streaming", true);
+ * config.put("poll_interval_seconds", 10);
+ * }
+ * + *

Choosing a Starting Point (ICEBERG_CDC only)

+ * + * By default, a batch read will start reading from the earliest (oldest) table snapshot. A + * streaming read will start reading from the latest (most recent) snapshot. This behavior can be + * overridden in a few mutually exclusive ways: + * + * + * + *

For example: + * + *

{@code
+ * Map config = Map.of(
+ *         "table", table,
+ *         "catalog_name", name,
+ *         "catalog_properties", Map.of(...),
+ *         "config_properties", Map.of(...),
+ *         "streaming", true,
+ *         "from_snapshot", 123456789L);
+ *
+ * PCollection = pipeline
+ *     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ *     .getSinglePCollection();
+ * }
+ * + *

Choosing an End Point (ICEBERG_CDC only)

+ * + * By default, a batch read will go up until the most recent table snapshot. A streaming read will + * continue monitoring the table for new snapshots forever. This can be overridden with one of the + * following options: + * + * + * + *

For example: + * + *

{@code
+ * Map config = Map.of(
+ *         "table", table,
+ *         "catalog_name", name,
+ *         "catalog_properties", Map.of(...),
+ *         "config_properties", Map.of(...),
+ *         "from_snapshot", 123456789L,
+ *         "to_timestamp", 987654321L);
+ *
+ * PCollection = pipeline
+ *     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ *     .getSinglePCollection();
+ * }
+ * + * Note: If {@code streaming=true} and an end point is set, the pipeline will run in + * streaming mode and shut down automatically after processing the final snapshot. + */ +@Internal +public class IcebergIO { + + public static WriteRows writeRows(IcebergCatalogConfig catalog) { + return new AutoValue_IcebergIO_WriteRows.Builder() + .setCatalogConfig(catalog) + .setDistributionMode(DistributionMode.HASH) + .setAutoSharding(false) + .build(); + } + + @AutoValue + public abstract static class WriteRows extends PTransform, IcebergWriteResult> { + + abstract IcebergCatalogConfig getCatalogConfig(); + + abstract @Nullable TableIdentifier getTableIdentifier(); + + abstract @Nullable DynamicDestinations getDynamicDestinations(); + + abstract @Nullable Duration getTriggeringFrequency(); + + abstract @Nullable Integer getDirectWriteByteLimit(); + + abstract DistributionMode getDistributionMode(); + + abstract @Nullable SerializableFunction getDistributionFunction(); + + abstract boolean getAutoSharding(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCatalogConfig(IcebergCatalogConfig config); + + abstract Builder setTableIdentifier(TableIdentifier identifier); + + abstract Builder setDynamicDestinations(DynamicDestinations destinations); + + abstract Builder setTriggeringFrequency(Duration triggeringFrequency); + + abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + + abstract Builder setDistributionMode(DistributionMode mode); + + abstract Builder setDistributionFunction(SerializableFunction shardFn); + + abstract Builder setAutoSharding(boolean autoSharding); + + abstract WriteRows build(); + } + + public WriteRows to(TableIdentifier identifier) { + return toBuilder().setTableIdentifier(identifier).build(); + } + + public WriteRows to(DynamicDestinations destinations) { + return toBuilder().setDynamicDestinations(destinations).build(); + } + + /** + * Sets the frequency at which data is written to files and a new {@link + * org.apache.iceberg.Snapshot} is produced. + * + *

Roughly every triggeringFrequency duration, records are written to data files and appended + * to the respective table. Each append operation creates a new table snapshot. + * + *

Generally speaking, increasing this duration will result in fewer, larger data files and + * fewer snapshots. + * + *

This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming + * pipeline). + */ + public WriteRows withTriggeringFrequency(Duration triggeringFrequency) { + return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); + } + + public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { + return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build(); + } + + /** + * Defines the distribution mode of write data prior to writing. + * + *

The default distribution mode is {@link DistributionMode#HASH}. + * + *

Comparison of Distribution Modes:

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Comparison of Distribution Modes
ModeDescriptionProsCons
{@link DistributionMode#NONE}No network shuffle is performed. Records are sorted locally on workers prior to writing.Highly lightweight with zero shuffle/network overhead. Best for smaller data volumes.Writers on different workers can write to overlapping min/max key ranges across multiple files. Relies heavily on post-fact compaction or query time merges.
{@link DistributionMode#HASH}Data is shuffled and consolidated by partition key. All records for a partition are routed to a single worker.Consolidates partition files, eliminating cross-worker file overlapping for partition keys. Excellent worker stability.Can suffer from severe data skew if a single partition contains significantly more data than others (hot partitions).
{@link DistributionMode#RANGE}Data is shuffled based on a user-provided shard/bucket function (e.g., hashing/binning continuous keys).Distributes writes for hot partitions across multiple workers. Eliminates skew while keeping file min/max key ranges tight and non-overlapping.Requires providing a custom {@link SerializableFunction} mapping rows to integer shard/bucket IDs.
+ * + *

Recommendation Matrix (Sorting & Partitioning vs. Scale):

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Recommendation Matrix
PartitioningSortingScale / VolumeLatency PriorityRecommended ModeOperational Impact
PartitionedSortedSmallAny{@link DistributionMode#HASH}Consolidates partition files and sorts them locally. Avoids file overlaps for small volumes.
PartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Eliminates shuffle overhead for maximum write speed. Results in overlapping key ranges across files, which requires downstream compaction.
PartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#HASH} with auto-sharding OR {@link DistributionMode#RANGE}HASH with auto-sharding scales writes for hot partitions but can result in overlapping file ranges requiring query-time sort merges. RANGE sharding distributes hot partitions into sequential, non-overlapping files to optimize reads.
PartitionedUnsortedSmallAny{@link DistributionMode#HASH}Consolidates data files into single partition directories to prevent file fragmentation.
PartitionedUnsortedMedium / LargeAny{@link DistributionMode#HASH} with auto-shardingConsolidates partition files while dynamically balancing hot partition writes across parallel workers.
UnpartitionedSortedSmallAny{@link DistributionMode#NONE}Bypasses network shuffle for fast, low-volume local sorting.
UnpartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Bypasses network shuffle for parallel worker writes. Requires downstream compaction to resolve overlapping file ranges.
UnpartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#RANGE} (with custom sharding function)Shards continuous keys into non-overlapping worker ranges. Eliminates single-worker bottlenecks and guarantees zero file overlap for fast queries.
UnpartitionedUnsortedAnyAny{@link DistributionMode#NONE}Direct, parallel worker writes with maximum throughput and zero network shuffle overhead.
+ * + *

Code Samples:

+ * + *
{@code
+     * // 1. Using default HASH distribution mode (Consolidates by partition key)
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId));
+     *
+     * // 2. Using NONE distribution mode (No shuffle, local sorting only)
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId)
+     *         .withDistributionMode(DistributionMode.NONE));
+     *
+     * // 3. Using RANGE distribution mode with a custom shard/bucket function to avoid data skew
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId)
+     *         .withDistributionMode(DistributionMode.RANGE)
+     *         .withDistributionFunction(row -> {
+     *             // Group continuous IDs into 16 parallel, non-overlapping shards
+     *             long id = row.getInt64("id");
+     *             return (int) (id / 10000);
+     *         }));
+     * }
+ * + * @param mode The distribution mode. + */ + public WriteRows withDistributionMode(DistributionMode mode) { + return toBuilder().setDistributionMode(mode).build(); + } + + /** + * Sets the custom range-distribution function. + * + *

Only applicable when the distribution mode is set to {@link DistributionMode#RANGE}. The + * function maps a Beam {@link Row} to an Integer representing a shard/bucket ID. + */ + public WriteRows withDistributionFunction(SerializableFunction shardFn) { + return toBuilder().setDistributionFunction(shardFn).build(); + } + + /** + * Enables Beam's dynamic auto-sharding when using {@link DistributionMode#HASH}. + * + *

When enabled, the pipeline uses {@link + * org.apache.beam.sdk.transforms.GroupIntoBatches#withShardedKey()} under the hood. The runner + * (such as Dataflow) dynamically monitors throughput per partition key. If a partition is + * extremely hot, the runner automatically splits it into parallel sub-shards distributed across + * multiple workers to prevent single-worker bottlenecks and out-of-memory (OOM) errors, while + * keeping the number of data files for cold partitions minimal. + * + *

Note that because auto-sharding distributes hot-partition data randomly across worker + * shards, the written data files cannot guarantee non-overlapping key ranges. Downstream + * queries may require read-time sort merges for overlapping file segments until an Iceberg + * compaction job (e.g., `rewriteDataFiles`) is executed. + * + *

Only applicable when using {@link DistributionMode#HASH}. + */ + public WriteRows withAutosharding() { + return toBuilder().setAutoSharding(true).build(); + } + + @Override + public IcebergWriteResult expand(PCollection input) { + List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); + Preconditions.checkArgument( + 1 == allToArgs.stream().filter(Predicates.notNull()).count(), + "Must set exactly one of table identifier or dynamic destinations object."); + + DynamicDestinations destinations = getDynamicDestinations(); + if (destinations == null) { + destinations = + DynamicDestinations.singleTable( + Preconditions.checkNotNull(getTableIdentifier()), input.getSchema()); + } + + // Assign destinations before re-windowing to global in WriteToDestinations because + // user's dynamic destination may depend on windowing properties + if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) { + Preconditions.checkArgument( + IcebergUtils.isUnbounded(input), + "Must only provide direct write limit for unbounded pipelines."); + } + + switch (getDistributionMode()) { + case NONE: + Preconditions.checkArgument( + !getAutoSharding(), + "Autosharding option is only available with " + "'hash' distribution mode."); + return input + .apply("Assign Table Destinations", new AssignDestinations(destinations)) + .apply( + "Write Rows to Destinations", + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); + case HASH: + return input + .apply( + "AssignDestinationAndPartition", + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getAutoSharding())); + case RANGE: + Preconditions.checkArgument( + getDistributionFunction() != null, + "Must provide a distribution function when using RANGE distribution mode."); + return input + .apply( + "AssignDestinationAndPartitionWithRange", + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getAutoSharding())); + default: + throw new UnsupportedOperationException( + "Unsupported distribution mode: " + getDistributionMode()); + } + } + } + + public static ReadRows readRows(IcebergCatalogConfig catalogConfig) { + return new AutoValue_IcebergIO_ReadRows.Builder() + .setCatalogConfig(catalogConfig) + .setUseCdc(false) + .build(); + } + + @AutoValue + public abstract static class ReadRows extends PTransform> { + public enum StartingStrategy { + EARLIEST, + LATEST + } + + abstract IcebergCatalogConfig getCatalogConfig(); + + abstract @Nullable TableIdentifier getTableIdentifier(); + + abstract boolean getUseCdc(); + + abstract @Nullable Long getFromSnapshot(); + + abstract @Nullable Long getToSnapshot(); + + abstract @Nullable Long getFromTimestamp(); + + abstract @Nullable Long getToTimestamp(); + + abstract @Nullable StartingStrategy getStartingStrategy(); + + abstract @Nullable Boolean getStreaming(); + + abstract @Nullable Duration getPollInterval(); + + abstract @Nullable List getKeep(); + + abstract @Nullable List getDrop(); + + abstract @Nullable String getFilter(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCatalogConfig(IcebergCatalogConfig config); + + abstract Builder setTableIdentifier(TableIdentifier identifier); + + abstract Builder setUseCdc(boolean useCdc); + + abstract Builder setFromSnapshot(@Nullable Long fromSnapshot); + + abstract Builder setToSnapshot(@Nullable Long toSnapshot); + + abstract Builder setFromTimestamp(@Nullable Long fromTimestamp); + + abstract Builder setToTimestamp(@Nullable Long toTimestamp); + + abstract Builder setStartingStrategy(@Nullable StartingStrategy strategy); + + abstract Builder setStreaming(@Nullable Boolean streaming); + + abstract Builder setPollInterval(@Nullable Duration triggeringFrequency); + + abstract Builder setKeep(@Nullable List fields); + + abstract Builder setDrop(@Nullable List fields); + + abstract Builder setFilter(@Nullable String filter); + + abstract ReadRows build(); + } + + public ReadRows withCdc() { + return toBuilder().setUseCdc(true).build(); + } + + public ReadRows from(TableIdentifier tableIdentifier) { + return toBuilder().setTableIdentifier(tableIdentifier).build(); + } + + public ReadRows fromSnapshot(@Nullable Long fromSnapshot) { + return toBuilder().setFromSnapshot(fromSnapshot).build(); + } + + public ReadRows toSnapshot(@Nullable Long toSnapshot) { + return toBuilder().setToSnapshot(toSnapshot).build(); + } + + public ReadRows fromTimestamp(@Nullable Long fromTimestamp) { + return toBuilder().setFromTimestamp(fromTimestamp).build(); + } + + public ReadRows toTimestamp(@Nullable Long toTimestamp) { + return toBuilder().setToTimestamp(toTimestamp).build(); + } + + public ReadRows withPollInterval(Duration pollInterval) { + return toBuilder().setPollInterval(pollInterval).build(); + } + + public ReadRows streaming(@Nullable Boolean streaming) { + return toBuilder().setStreaming(streaming).build(); + } + + public ReadRows withStartingStrategy(@Nullable StartingStrategy strategy) { + return toBuilder().setStartingStrategy(strategy).build(); + } + + public ReadRows keeping(@Nullable List keep) { + return toBuilder().setKeep(keep).build(); + } + + public ReadRows dropping(@Nullable List drop) { + return toBuilder().setDrop(drop).build(); + } + + public ReadRows withFilter(@Nullable String filter) { + return toBuilder().setFilter(filter).build(); + } + + @Override + public PCollection expand(PBegin input) { + TableIdentifier tableId = + checkStateNotNull(getTableIdentifier(), "Must set a table to read from."); + + Table table = getCatalogConfig().catalog().loadTable(tableId); + + IcebergScanConfig scanConfig = + IcebergScanConfig.builder() + .setCatalogConfig(getCatalogConfig()) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(tableId) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) + .setFromSnapshotInclusive(getFromSnapshot()) + .setToSnapshot(getToSnapshot()) + .setFromTimestamp(getFromTimestamp()) + .setToTimestamp(getToTimestamp()) + .setStartingStrategy(getStartingStrategy()) + .setStreaming(getStreaming()) + .setPollInterval(getPollInterval()) + .setUseCdc(getUseCdc()) + .setKeepFields(getKeep()) + .setDropFields(getDrop()) + .setFilterString(getFilter()) + .build(); + scanConfig.validate(table); + + PTransform> source = + getUseCdc() + ? new IncrementalScanSource(scanConfig) + : Read.from(new ScanSource(scanConfig)); + + return input.apply(source); + } + } +} diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java new file mode 100644 index 000000000000..6efc1bbe2eec --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.joda.time.ReadableInstant; + +/** + * A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link + * BufferedExternalSorter} to spill to local disk when elements exceed memory limit. + */ +class IcebergRowSorter implements Serializable { + + public static Iterable sortRows( + Iterable rows, + SortOrder sortOrder, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) { + + if (sortOrder == null || !sortOrder.isSorted()) { + return rows; + } + + BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options(); + BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions); + RowCoder rowCoder = RowCoder.of(beamSchema); + + List fields = sortOrder.fields(); + String[] columnNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = icebergSchema.findColumnName(fields.get(i).sourceId()); + } + + // Create reusable ByteArrayOutputStreams for key and value encoding + ByteArrayOutputStream keyBaos = new ByteArrayOutputStream(); + ByteArrayOutputStream valBaos = new ByteArrayOutputStream(); + + try { + for (Row row : rows) { + keyBaos.reset(); + valBaos.reset(); + encodeSortKey(row, sortOrder, columnNames, keyBaos, icebergSchema, beamSchema); + byte[] keyBytes = keyBaos.toByteArray(); + + rowCoder.encode(row, valBaos); + byte[] valBytes = valBaos.toByteArray(); + sorter.add(KV.of(keyBytes, valBytes)); + } + + Iterable> sortedKVs = sorter.sort(); + return new Iterable() { + @Override + public Iterator iterator() { + final Iterator> it = sortedKVs.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Row next() { + KV next = it.next(); + try { + ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue()); + return rowCoder.decode(bais); + } catch (IOException e) { + throw new RuntimeException("Failed to decode Row during sorting", e); + } + } + }; + } + }; + + } catch (IOException e) { + throw new RuntimeException("Failed to sort rows with external sorter", e); + } + } + + @SuppressWarnings("nullness") + public static void encodeSortKey( + Row row, + SortOrder sortOrder, + String[] columnNames, + ByteArrayOutputStream baos, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) + throws IOException { + + List fields = sortOrder.fields(); + + for (int i = 0; i < fields.size(); i++) { + SortField field = fields.get(i); + String colName = columnNames[i]; + Object val = row.getValue(colName); + + if (!field.transform().isIdentity()) { + Object icebergVal = + IcebergUtils.beamValueToIcebergValue(icebergSchema.findType(field.sourceId()), val); + if (icebergVal != null) { + val = field.transform().apply(icebergVal); + } else { + val = null; + } + } + + boolean isNull = (val == null); + boolean isDesc = (field.direction() == SortDirection.DESC); + boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST); + + // Determine correct header prefix to fulfill the NullOrder contracts + byte prefixByte; + if (isNull) { + prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF; + } else { + prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00; + } + + baos.write(prefixByte); + + if (!isNull) { + writeValue(val, baos, isDesc); + } + } + } + + private static void writeInt(int v, ByteArrayOutputStream baos, boolean invert) { + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + private static void writeLong(long v, ByteArrayOutputStream baos, boolean invert) { + byte b7 = (byte) (v >>> 56); + byte b6 = (byte) (v >>> 48); + byte b5 = (byte) (v >>> 40); + byte b4 = (byte) (v >>> 32); + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b7); + baos.write(~b6); + baos.write(~b5); + baos.write(~b4); + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b7); + baos.write(b6); + baos.write(b5); + baos.write(b4); + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + @SuppressWarnings("JavaUtilDate") + private static void writeValue(Object val, ByteArrayOutputStream baos, boolean invert) + throws IOException { + if (val instanceof String) { + writeString((String) val, baos, invert); + } else if (val instanceof Integer) { + int v = (Integer) val; + writeInt(v ^ Integer.MIN_VALUE, baos, invert); + } else if (val instanceof Long) { + long v = (Long) val; + writeLong(v ^ Long.MIN_VALUE, baos, invert); + } else if (val instanceof Float) { + int bits = Float.floatToIntBits((Float) val); + bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits; + writeInt(bits, baos, invert); + } else if (val instanceof Double) { + long bits = Double.doubleToLongBits((Double) val); + bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits; + writeLong(bits, baos, invert); + } else if (val instanceof Boolean) { + byte b = ((Boolean) val) ? (byte) 0x01 : (byte) 0x00; + baos.write(invert ? ~b : b); + } else if (val instanceof byte[]) { + writeByteArray((byte[]) val, baos, invert); + } else if (val instanceof ByteBuffer) { + writeByteArray(((ByteBuffer) val).array(), baos, invert); + } else if (val instanceof ReadableInstant) { + long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Instant) { + long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Date) { + long enc = ((Date) val).getTime() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else { + throw new UnsupportedOperationException( + "Unsupported type for sorting: " + val.getClass().getName()); + } + } + + private static void writeString(String s, ByteArrayOutputStream baos, boolean invert) + throws IOException { + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + writeByteArray(bytes, baos, invert); + } + + private static void writeByteArray(byte[] bytes, ByteArrayOutputStream baos, boolean invert) { + for (byte b : bytes) { + if (b == 0x00) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + } else if (b == 0x01) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x02 : (byte) 0x02); + } else { + baos.write(invert ? ~b : b); + } + } + baos.write(invert ? ~(byte) 0x00 : (byte) 0x00); + } +} diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java new file mode 100644 index 000000000000..7f48a0d0128c --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -0,0 +1,683 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ +public class IcebergUtils { + private IcebergUtils() {} + + private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() + .put(Schema.TypeName.BOOLEAN, Types.BooleanType.get()) + .put(Schema.TypeName.INT32, Types.IntegerType.get()) + .put(Schema.TypeName.INT64, Types.LongType.get()) + .put(Schema.TypeName.FLOAT, Types.FloatType.get()) + .put(Schema.TypeName.DOUBLE, Types.DoubleType.get()) + .put(Schema.TypeName.STRING, Types.StringType.get()) + .put(Schema.TypeName.BYTES, Types.BinaryType.get()) + .put(Schema.TypeName.DATETIME, Types.TimestampType.withZone()) + .build(); + + private static final Map BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() + .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) + .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) + .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) + .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get()) + .put(MicrosInstant.IDENTIFIER, Types.TimestampType.withZone()) + .build(); + + private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { + switch (type.typeId()) { + case BOOLEAN: + return Schema.FieldType.BOOLEAN; + case INTEGER: + return Schema.FieldType.INT32; + case LONG: + return Schema.FieldType.INT64; + case FLOAT: + return Schema.FieldType.FLOAT; + case DOUBLE: + return Schema.FieldType.DOUBLE; + case DATE: + return Schema.FieldType.logicalType(SqlTypes.DATE); + case TIME: + return Schema.FieldType.logicalType(SqlTypes.TIME); + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + if (ts.shouldAdjustToUTC()) { + return Schema.FieldType.DATETIME; + } + return Schema.FieldType.logicalType(SqlTypes.DATETIME); + case STRING: + return Schema.FieldType.STRING; + case UUID: + case BINARY: + return Schema.FieldType.BYTES; + case FIXED: + case DECIMAL: + return Schema.FieldType.DECIMAL; + case STRUCT: + return Schema.FieldType.row(icebergStructTypeToBeamSchema(type.asStructType())); + case LIST: + return Schema.FieldType.array(icebergTypeToBeamFieldType(type.asListType().elementType())); + case MAP: + return Schema.FieldType.map( + icebergTypeToBeamFieldType(type.asMapType().keyType()), + icebergTypeToBeamFieldType(type.asMapType().valueType())); + default: + throw new RuntimeException("Unrecognized Iceberg Type: " + type.typeId()); + } + } + + private static Schema.Field icebergFieldToBeamField(final Types.NestedField field) { + return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type())) + .withNullable(field.isOptional()); + } + + /** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link Schema}. */ + public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema schema) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : schema.columns()) { + builder.addField(icebergFieldToBeamField(f)); + } + return builder.build(); + } + + private static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : struct.fields()) { + builder.addField(icebergFieldToBeamField(f)); + } + return builder.build(); + } + + /** + * Represents a {@link Type} and the most recent field ID used to build it. + * + *

Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link + * org.apache.iceberg.types.Type.NestedType}'s components (e.g. {@link Types.ListType}'s + * collection type, {@link Types.MapType}'s key type and value type, and {@link + * Types.StructType}'s nested fields). The {@code maxId} in this object represents the most recent + * ID used after building this type. This helps signal that the next {@link + * org.apache.iceberg.types.Type.NestedType} we construct should have an ID greater than this one. + */ + @VisibleForTesting + static class TypeAndMaxId { + int maxId; + Type type; + + TypeAndMaxId(int id, Type object) { + this.maxId = id; + this.type = object; + } + } + + /** + * Takes a Beam {@link Schema.FieldType} and an index intended as a starting point for Iceberg + * {@link org.apache.iceberg.types.Type.NestedType}s. Returns an Iceberg {@link Type} and the + * maximum index after building that type. + * + *

Returns this information in an {@link TypeAndMaxId} object. + */ + @VisibleForTesting + static TypeAndMaxId beamFieldTypeToIcebergFieldType( + Schema.FieldType beamType, int nestedFieldId) { + if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) { + // we don't use nested field ID for primitive types. decrement it so the caller can use it for + // other types. + return new TypeAndMaxId( + --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); + } else if (beamType.getTypeName().isLogicalType()) { + Schema.LogicalType logicalType = checkArgumentNotNull(beamType.getLogicalType()); + if (logicalType instanceof FixedPrecisionNumeric) { + Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument()); + Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision")); + Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale")); + return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale)); + } + if (logicalType instanceof PassThroughLogicalType) { + return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), nestedFieldId); + } + String logicalTypeIdentifier = logicalType.getIdentifier(); + @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); + if (type == null) { + throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); + } + return new TypeAndMaxId(--nestedFieldId, type); + } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE + Schema.FieldType beamCollectionType = + Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()); + + // nestedFieldId is reserved for the list's collection type. + // we increment here because further nested fields should use unique ID's + TypeAndMaxId listInfo = + beamFieldTypeToIcebergFieldType(beamCollectionType, nestedFieldId + 1); + Type icebergCollectionType = listInfo.type; + + boolean elementTypeIsNullable = + Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable(); + + Type listType = + elementTypeIsNullable + ? Types.ListType.ofOptional(nestedFieldId, icebergCollectionType) + : Types.ListType.ofRequired(nestedFieldId, icebergCollectionType); + + return new TypeAndMaxId(listInfo.maxId, listType); + } else if (beamType.getTypeName().isMapType()) { // MAP + // key and value IDs need to be unique + int keyId = nestedFieldId; + int valueId = keyId + 1; + + // nested field IDs should be unique + nestedFieldId = valueId + 1; + Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType()); + TypeAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(beamKeyType, nestedFieldId); + Type icebergKeyType = keyInfo.type; + + nestedFieldId = keyInfo.maxId + 1; + Schema.FieldType beamValueType = + Preconditions.checkArgumentNotNull(beamType.getMapValueType()); + TypeAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(beamValueType, nestedFieldId); + Type icebergValueType = valueInfo.type; + + Type mapType = + beamValueType.getNullable() + ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType) + : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType); + + return new TypeAndMaxId(valueInfo.maxId, mapType); + } else if (beamType.getTypeName().isCompositeType()) { // ROW + // Nested field IDs need to be unique from the field that contains this StructType + Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema()); + List nestedFields = new ArrayList<>(nestedSchema.getFieldCount()); + + int icebergFieldId = nestedFieldId; + nestedFieldId = icebergFieldId + nestedSchema.getFieldCount(); + for (Schema.Field beamField : nestedSchema.getFields()) { + TypeAndMaxId typeAndMaxId = + beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId); + Types.NestedField icebergField = + Types.NestedField.of( + icebergFieldId++, + beamField.getType().getNullable(), + beamField.getName(), + typeAndMaxId.type); + + nestedFields.add(icebergField); + nestedFieldId = typeAndMaxId.maxId + 1; + } + + Type structType = Types.StructType.of(nestedFields); + + return new TypeAndMaxId(nestedFieldId - 1, structType); + } + + return new TypeAndMaxId(nestedFieldId, Types.StringType.get()); + } + + /** + * Converts a Beam {@link Schema} to an Iceberg {@link org.apache.iceberg.Schema}. + * + *

The following unsupported Beam types will be defaulted to {@link Types.StringType}: + *

  • {@link Schema.TypeName.DECIMAL} + */ + public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { + List fields = new ArrayList<>(schema.getFieldCount()); + int nestedFieldId = schema.getFieldCount() + 1; + int icebergFieldId = 1; + for (Schema.Field beamField : schema.getFields()) { + TypeAndMaxId typeAndMaxId = + beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId); + Types.NestedField icebergField = + Types.NestedField.of( + icebergFieldId++, + beamField.getType().getNullable(), + beamField.getName(), + typeAndMaxId.type); + + fields.add(icebergField); + nestedFieldId = typeAndMaxId.maxId + 1; + } + return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()])); + } + + /** + * Converts a Beam field value to its Iceberg-compatible equivalent based on the Iceberg {@link + * Type}. + */ + public static @Nullable Object beamValueToIcebergValue(Type type, @Nullable Object value) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIME: + case DECIMAL: + case STRING: + return value; + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + return getIcebergTimestampValue(value, ts.shouldAdjustToUTC()); + case UUID: + if (value instanceof byte[]) { + return UUID.nameUUIDFromBytes((byte[]) value); + } + return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + return value; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + default: + return value; + } + } + + /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ + public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) { + if (row.getSchema().getFieldCount() != schema.columns().size()) { + throw new IllegalStateException( + String.format( + "Beam Row schema and Iceberg schema have different sizes.%n\tBeam Row columns: %s%n\tIceberg schema columns: %s", + row.getSchema().getFieldNames(), + schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList()))); + } + return copyRowIntoRecord(GenericRecord.create(schema), row); + } + + private static Record copyRowIntoRecord(Record baseRecord, Row value) { + Record rec = baseRecord.copy(); + for (Types.NestedField f : rec.struct().fields()) { + copyFieldIntoRecord(rec, f, value); + } + return rec; + } + + private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row value) { + String name = field.name(); + switch (field.type().typeId()) { + case BOOLEAN: + Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name, v)); + break; + case INTEGER: + Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name, v)); + break; + case LONG: + Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name, v)); + break; + case FLOAT: + Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DOUBLE: + Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DATE: + Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class)) + .ifPresent(v -> rec.setField(name, v)); + break; + case TIME: + Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class)) + .ifPresent(v -> rec.setField(name, v)); + break; + case TIMESTAMP: + Object val = value.getValue(name); + if (val == null) { + break; + } + Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType(); + rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC())); + break; + case STRING: + Object strVal = value.getValue(name); + if (strVal != null) { + rec.setField(name, strVal.toString()); + } + break; + case UUID: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v))); + break; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + case BINARY: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v))); + break; + case DECIMAL: + Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> rec.setField(name, v)); + break; + case STRUCT: + Optional.ofNullable(value.getRow(name)) + .ifPresent( + row -> + rec.setField( + name, + copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row))); + break; + case LIST: + Iterable<@NonNull ?> icebergList = value.getIterable(name); + Type collectionType = ((Types.ListType) field.type()).elementType(); + + if (collectionType.isStructType() && icebergList != null) { + org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema(); + ImmutableList.Builder builder = ImmutableList.builder(); + for (Row v : (Iterable) icebergList) { + builder.add(beamRowToIcebergRecord(innerSchema, v)); + } + icebergList = builder.build(); + } + Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list)); + break; + case MAP: + Map icebergMap = value.getMap(name); + Type valueType = ((Types.MapType) field.type()).valueType(); + // recurse on struct types + if (valueType.isStructType() && icebergMap != null) { + org.apache.iceberg.Schema innerSchema = valueType.asStructType().asSchema(); + + ImmutableMap.Builder newMap = ImmutableMap.builder(); + for (Map.Entry entry : icebergMap.entrySet()) { + Row row = checkStateNotNull(((Row) entry.getValue())); + newMap.put(checkStateNotNull(entry.getKey()), beamRowToIcebergRecord(innerSchema, row)); + } + icebergMap = newMap.build(); + } + Optional.ofNullable(icebergMap).ifPresent(v -> rec.setField(name, v)); + break; + default: + // Do nothing for unsupported types + break; + } + } + + /** + * Returns the appropriate value for an Iceberg timestamp field + * + *

    If `timestamp`, we resolve incoming values to a {@link LocalDateTime}. + * + *

    If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg already resolves all + * incoming timestamps to UTC, so there is no harm in doing it from our side. + * + *

    Valid types are: + * + *

      + *
    • {@link SqlTypes.DATETIME} --> {@link LocalDateTime} + *
    • {@link Schema.FieldType.DATETIME} --> {@link Instant} + *
    • {@link Schema.FieldType.INT64} --> {@link Long} + *
    • {@link Schema.FieldType.STRING} --> {@link String} + *
    + */ + private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) { + // timestamptz + if (shouldAdjustToUtc) { + if (beamValue instanceof java.time.Instant) { // MicrosInstant + OffsetDateTime epoch = java.time.Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + java.time.Instant instant = (java.time.Instant) beamValue; + long nanosFromEpoch = + TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano(); + return ChronoUnit.NANOS.addTo(epoch, nanosFromEpoch); + } else if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME + return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC); + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestamptzFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return OffsetDateTime.parse((String) beamValue).withOffsetSameInstant(ZoneOffset.UTC); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + + // timestamp + if (beamValue instanceof java.time.Instant) { // MicrosInstant + java.time.Instant instant = (java.time.Instant) beamValue; + return DateTimeUtil.timestampFromNanos( + TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano()); + } else if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME + return beamValue; + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestampFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return LocalDateTime.parse((String) beamValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + + /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ + public static Row icebergRecordToBeamRow(Schema schema, Record record) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Schema.Field field : schema.getFields()) { + boolean isNullable = field.getType().getNullable(); + @Nullable Object icebergValue = record.getField(field.getName()); + if (icebergValue == null) { + if (isNullable) { + rowBuilder.addValue(null); + continue; + } + throw new RuntimeException( + String.format("Received null value for required field '%s'.", field.getName())); + } + switch (field.getType().getTypeName()) { + case BYTE: + case INT16: + case INT32: + case INT64: + case DECIMAL: // Iceberg and Beam both use BigDecimal + case FLOAT: // Iceberg and Beam both use float + case DOUBLE: // Iceberg and Beam both use double + case STRING: // Iceberg and Beam both use String + case BOOLEAN: // Iceberg and Beam both use boolean + rowBuilder.addValue(icebergValue); + break; + case ARRAY: + checkState( + icebergValue instanceof List, + "Expected List type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + List<@NonNull ?> beamList = (List<@NonNull ?>) icebergValue; + Schema.FieldType collectionType = + checkStateNotNull(field.getType().getCollectionElementType()); + // recurse on struct types + if (collectionType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(collectionType.getRowSchema()); + beamList = + beamList.stream() + .map(v -> icebergRecordToBeamRow(innerSchema, (Record) v)) + .collect(Collectors.toList()); + } + rowBuilder.addValue(beamList); + break; + case ITERABLE: + checkState( + icebergValue instanceof Iterable, + "Expected Iterable type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + Iterable<@NonNull ?> beamIterable = (Iterable<@NonNull ?>) icebergValue; + Schema.FieldType iterableCollectionType = + checkStateNotNull(field.getType().getCollectionElementType()); + // recurse on struct types + if (iterableCollectionType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(iterableCollectionType.getRowSchema()); + ImmutableList.Builder builder = ImmutableList.builder(); + for (Record v : (Iterable<@NonNull Record>) icebergValue) { + builder.add(icebergRecordToBeamRow(innerSchema, v)); + } + beamIterable = builder.build(); + } + rowBuilder.addValue(beamIterable); + break; + case MAP: + checkState( + icebergValue instanceof Map, + "Expected Map type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + Map beamMap = (Map) icebergValue; + Schema.FieldType valueType = checkStateNotNull(field.getType().getMapValueType()); + // recurse on struct types + if (valueType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(valueType.getRowSchema()); + ImmutableMap.Builder newMap = ImmutableMap.builder(); + for (Map.Entry entry : ((Map) icebergValue).entrySet()) { + Record rec = ((Record) entry.getValue()); + newMap.put( + checkStateNotNull(entry.getKey()), + icebergRecordToBeamRow(innerSchema, checkStateNotNull(rec))); + } + beamMap = newMap.build(); + } + rowBuilder.addValue(beamMap); + break; + case DATETIME: + // Iceberg uses a long for micros. + // Beam DATETIME uses joda's DateTime, which only supports millis, + // so we do lose some precision here + rowBuilder.addValue(getBeamDateTimeValue(icebergValue)); + break; + case BYTES: + // Iceberg uses ByteBuffer; Beam uses byte[] + rowBuilder.addValue(((ByteBuffer) icebergValue).array()); + break; + case ROW: + Record nestedRecord = (Record) icebergValue; + Schema nestedSchema = + checkArgumentNotNull( + field.getType().getRowSchema(), + "Corrupted schema: Row type did not have associated nested schema."); + rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); + break; + case LOGICAL_TYPE: + rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType())); + break; + default: + throw new UnsupportedOperationException( + "Unsupported Beam type: " + field.getType().getTypeName()); + } + } + return rowBuilder.build(); + } + + private static DateTime getBeamDateTimeValue(Object icebergValue) { + long micros; + if (icebergValue instanceof OffsetDateTime) { + micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); + } else if (icebergValue instanceof LocalDateTime) { + micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); + } else if (icebergValue instanceof Long) { + micros = (long) icebergValue; + } else if (icebergValue instanceof String) { + return DateTime.parse((String) icebergValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); + } + return new DateTime(micros / 1000L); + } + + private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) { + if (icebergValue instanceof String) { + String strValue = (String) icebergValue; + if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return LocalDate.parse(strValue); + } else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return LocalTime.parse(strValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return LocalDateTime.parse(strValue); + } + } else if (icebergValue instanceof Long) { + if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return DateTimeUtil.timeFromMicros((Long) icebergValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return DateTimeUtil.timestampFromMicros((Long) icebergValue); + } + } else if (icebergValue instanceof Integer + && type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return DateTimeUtil.dateFromDays((Integer) icebergValue); + } else if (icebergValue instanceof OffsetDateTime + && type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return ((OffsetDateTime) icebergValue) + .withOffsetSameInstant(ZoneOffset.UTC) + .toLocalDateTime(); + } + // LocalDateTime, LocalDate, LocalTime + return icebergValue; + } + + static boolean isUnbounded(PCollection input) { + return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED); + } + + static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) { + return directWriteByteLimit != null && directWriteByteLimit >= 0; + } +} diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java new file mode 100644 index 000000000000..0da0d4c5968c --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.DESTINATION; +import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.PARTITION; +import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class WritePartitionedRowsToFiles + extends PTransform>>, PCollection> { + private static final Logger LOG = LoggerFactory.getLogger(WritePartitionedRowsToFiles.class); + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + + WritePartitionedRowsToFiles( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + } + + @Override + public PCollection expand(PCollection>> input) { + Schema dataSchema = + ((RowCoder) + ((IterableCoder) + ((KvCoder>) input.getCoder()).getValueCoder()) + .getElemCoder()) + .getSchema(); + return input.apply( + ParDo.of(new WriteDoFn(catalogConfig, dynamicDestinations, filePrefix, dataSchema))); + } + + private static class WriteDoFn extends DoFn>, FileWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + private final Schema dataSchema; + static final Cache LAST_REFRESHED_TABLE_CACHE = + CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + + WriteDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix, + Schema dataSchema) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.dataSchema = dataSchema; + } + + @ProcessElement + public void processElement( + @Element KV> element, OutputReceiver out) + throws Exception { + String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); + String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); + + IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); + LastRefreshedTable lastRefreshedTable = getOrCreateTable(destination, dataSchema); + Table table = lastRefreshedTable.table; + partitionPath = getPartitionDataPath(partitionPath, lastRefreshedTable.partitionFieldMap); + + StructLike partitionData = + table.spec().isPartitioned() + ? DataFiles.data(table.spec(), partitionPath) + : new PartitionKey(table.spec(), table.schema()); + + String fileName = + destination + .getFileFormat() + .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID())); + + RecordWriter writer = + new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); + try { + Iterable sortedOrUnsortedRows = + IcebergRowSorter.sortRows( + element.getValue(), table.sortOrder(), table.schema(), dataSchema); + for (Row row : sortedOrUnsortedRows) { + Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); + writer.write(record); + } + } finally { + writer.close(); + } + + SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath); + out.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setSerializableDataFile(sdf) + .build()); + } + + static final class LastRefreshedTable { + final Table table; + volatile Instant lastRefreshTime; + static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2); + private int specId; + volatile Map partitionFieldMap = Maps.newHashMap(); + + LastRefreshedTable(Table table, Instant lastRefreshTime) { + this.table = table; + this.specId = table.spec().specId(); + this.lastRefreshTime = lastRefreshTime; + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + } + + /** + * Refreshes the table metadata if it is considered stale (older than 2 minutes). + * + *

    This method first performs a non-synchronized check on the table's freshness. This + * provides a lock-free fast path that avoids synchronization overhead in the common case + * where the table does not need to be refreshed. If the table might be stale, it then enters + * a synchronized block to ensure that only one thread performs the refresh operation. + */ + void refreshIfStale() { + // Fast path: Avoid entering the synchronized block if the table is not stale. + if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) { + return; + } + synchronized (this) { + if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) { + table.refresh(); + lastRefreshTime = Instant.now(); + if (table.spec().specId() != this.specId) { + partitionFieldMap = Maps.newHashMap(); + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + this.specId = table.spec().specId(); + } + } + } + } + } + + LastRefreshedTable getOrCreateTable(IcebergDestination destination, Schema dataSchema) { + TableIdentifier identifier = destination.getTableIdentifier(); + @Nullable + LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); + if (lastRefreshedTable != null) { + lastRefreshedTable.refreshIfStale(); + return lastRefreshedTable; + } + + Namespace namespace = identifier.namespace(); + @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig(); + PartitionSpec partitionSpec = + createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned(); + Map tableProperties = + createConfig != null && createConfig.getTableProperties() != null + ? createConfig.getTableProperties() + : Maps.newHashMap(); + + @Nullable Table table = null; + synchronized (LAST_REFRESHED_TABLE_CACHE) { + lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); + if (lastRefreshedTable != null) { + lastRefreshedTable.refreshIfStale(); + return lastRefreshedTable; + } + + Catalog catalog = catalogConfig.catalog(); + // Create namespace if it does not exist yet + if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { + SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; + if (!supportsNamespaces.namespaceExists(namespace)) { + try { + supportsNamespaces.createNamespace(namespace); + LOG.info("Created new namespace '{}'.", namespace); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this namespace + LOG.info("Namespace `{}` already exists.", namespace); + } + } + } + + // If table exists, just load it + // Note: the implementation of catalog.tableExists() will load the table to check its + // existence. We don't use it here to avoid double loadTable() calls. + try { + table = catalog.loadTable(identifier); + } catch (NoSuchTableException e) { // Otherwise, create the table + org.apache.iceberg.Schema tableSchema = + IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + SortOrder sortOrder = + createConfig != null ? createConfig.getSortOrder() : SortOrder.unsorted(); + try { + table = + catalog + .buildTable(identifier, tableSchema) + .withPartitionSpec(partitionSpec) + .withSortOrder(sortOrder) + .withProperties(tableProperties) + .create(); + LOG.info( + "Created Iceberg table '{}' with schema: {}\n" + + ", partition spec: {}, sort order: {}, table properties: {}", + identifier, + tableSchema, + partitionSpec, + sortOrder, + tableProperties); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this table + table = catalog.loadTable(identifier); + } + } + } + lastRefreshedTable = new LastRefreshedTable(table, Instant.now()); + LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable); + return lastRefreshedTable; + } + } +} diff --git a/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/test/IcebergBigQueryScaleTest.java b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/test/IcebergBigQueryScaleTest.java new file mode 100644 index 000000000000..a479b8f08059 --- /dev/null +++ b/scratch/iceberg-scale-test/src/main/java/org/apache/beam/sdk/io/iceberg/test/IcebergBigQueryScaleTest.java @@ -0,0 +1,116 @@ +package org.apache.beam.sdk.io.iceberg.test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; +import org.apache.beam.sdk.io.iceberg.DynamicDestinations; +import org.apache.beam.sdk.io.iceberg.IcebergDestination; +import org.apache.beam.sdk.io.iceberg.IcebergIO; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.io.iceberg.IcebergTableCreateConfig; +import org.apache.iceberg.DistributionMode; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.schemas.transforms.Convert; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; + +public class IcebergBigQueryScaleTest implements Serializable { + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); + Pipeline pipeline = Pipeline.create(options); + + // 1. Define Hadoop Catalog on GCS Biglake bucket + Map catalogProps = new HashMap<>(); + catalogProps.put("type", "hadoop"); + catalogProps.put("warehouse", "gs://at-euw4-biglake-bucket/iceberg-warehouse"); + catalogProps.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"); + + Map configProps = new HashMap<>(); + configProps.put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); + configProps.put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"); + + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName("hadoop_catalog") + .setCatalogProperties(catalogProps) + .setConfigProperties(configProps) + .build(); + + // 2. Read crypto_ethereum.blocks partitioned table from BigQuery using High-Performance Direct Read API + PCollection bqRows = + pipeline.apply( + "Read Ethereum Blocks from BigQuery", + BigQueryIO.readTableRowsWithSchema() + .from("bigquery-public-data.crypto_ethereum.blocks") + .withMethod(Method.DIRECT_READ)) + .apply("Convert to Beam Rows", Convert.toRows()); + + final Schema dataSchema = bqRows.getSchema(); + final String salt = UUID.randomUUID().toString().substring(0, 8); + + // 3. Configure dynamic destinations to create partitioned and sorted Iceberg tables + DynamicDestinations dynamicDestinations = + new DynamicDestinations() { + @Override + public Schema getDataSchema() { + return dataSchema; + } + + @Override + public Row getData(Row element) { + return element; + } + + @Override + public String getTableStringIdentifier(ValueInSingleWindow element) { + // Write all blocks to a single Ethereum blocks table + return "hadoop_catalog.default.ethereum_blocks_" + salt; + } + + @Override + public IcebergDestination instantiateDestination(String dest) { + java.util.Map properties = new java.util.HashMap<>(); + properties.put("write.format.default", "parquet"); + properties.put("write.target-file-size-bytes", "10485760"); // 10MB target + properties.put("write.parquet.page-size-bytes", "1048576"); // 1MB page size + + return IcebergDestination.builder() + .setTableIdentifier(TableIdentifier.parse(dest)) + .setFileFormat(FileFormat.PARQUET) + .setTableCreateConfig( + IcebergTableCreateConfig.builder() + .setSchema(getDataSchema()) + // Partition on timestamp column (day-based) and sort by block hash + .setPartitionFields(Arrays.asList("month(timestamp)")) + .setSortFields(Arrays.asList("hash asc")) + .setTableProperties(properties) + .build()) + .build(); + } + }; + + bqRows.apply( + "Write Sorted Partitioned Blocks to GCS Iceberg Warehouse", + IcebergIO.writeRows(catalogConfig) + .to(dynamicDestinations) + .withDistributionMode(DistributionMode.HASH) + .withAutosharding()); + + System.out.println("Staging Dataflow pipeline graph..."); + pipeline.run(); + System.out.println("Dataflow pipeline launched successfully!"); + } +} diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index bbd55fee2fc8..141de355813c 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -46,6 +46,7 @@ dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(path: ":sdks:java:extensions:sorter") implementation library.java.avro implementation library.java.slf4j_api implementation library.java.joda_time diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index 475786d3a4f6..df2d2c5433ec 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -26,12 +26,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -51,38 +53,65 @@ class AssignDestinationsAndPartitions private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; + static final String DESTINATION = "destination"; static final String PARTITION = "partition"; + static final String SHARD = "shard"; static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder() .addStringField(DESTINATION) .addStringField(PARTITION) + .addNullableField(SHARD, org.apache.beam.sdk.schemas.Schema.FieldType.INT32) .build(); public AssignDestinationsAndPartitions( DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + this(dynamicDestinations, catalogConfig, DistributionMode.HASH, null); + } + + public AssignDestinationsAndPartitions( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; } @Override public PCollection> expand(PCollection input) { return input - .apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig))) + .apply( + ParDo.of( + new AssignDoFn( + dynamicDestinations, catalogConfig, distributionMode, distributionFunction))) .setCoder( KvCoder.of( RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema()))); } + @SuppressWarnings("nullness") static class AssignDoFn extends DoFn> { private transient @MonotonicNonNull Map partitionKeys; private transient @MonotonicNonNull Map wrappers; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; - AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + AssignDoFn( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; } @Setup @@ -132,8 +161,17 @@ public void processElement( partitionKey.partition(wrapper.wrap(data)); String partitionPath = partitionKey.toPath(); + Integer shardId = null; + if (distributionMode == DistributionMode.RANGE && distributionFunction != null) { + shardId = distributionFunction.apply(data); + } + Row destAndPartition = - Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build(); + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue(DESTINATION, tableIdentifier) + .withFieldValue(PARTITION, partitionPath) + .withFieldValue(SHARD, shardId) + .build(); out.output(KV.of(destAndPartition, data)); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index a5a3beef8f51..9b18a77326e0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -384,7 +385,7 @@ public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) { return new AutoValue_IcebergIO_WriteRows.Builder() .setCatalogConfig(catalog) - .setDistributionMode(DistributionMode.NONE) + .setDistributionMode(DistributionMode.HASH) .setAutoSharding(false) .build(); } @@ -404,6 +405,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract DistributionMode getDistributionMode(); + abstract @Nullable SerializableFunction getDistributionFunction(); + abstract boolean getAutoSharding(); abstract Builder toBuilder(); @@ -422,6 +425,8 @@ abstract static class Builder { abstract Builder setDistributionMode(DistributionMode mode); + abstract Builder setDistributionFunction(SerializableFunction shardFn); + abstract Builder setAutoSharding(boolean autoSharding); abstract WriteRows build(); @@ -457,19 +462,193 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { } /** - * Defines distribution of write data. Supported distributions: + * The default distribution mode is {@link DistributionMode#HASH}. + * + *

    Warning on HASH mode: Utilizing {@code HASH} distribution mode (with or without + * auto-sharding) can suffer from large unpartitioned or skewed writes if key spaces are + * not uniformly distributed. This can bottleneck workers and produce fragmented layout files. + * + *

    Note on RANGE mode: When utilizing {@code RANGE} distribution mode, it is + * recommended that the custom distribution function is designed to produce adequately sized and + * strictly non-overlapping ranges of the sorting column to optimize downstream read + * performance. + * + *

    Comparison of Distribution Modes:

    + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
    Comparison of Distribution Modes
    ModeDescriptionProsCons
    {@link DistributionMode#NONE}No network shuffle is performed. Records are sorted locally on workers prior to writing.Highly lightweight with zero shuffle/network overhead. Best for smaller data volumes.Writers on different workers can write to overlapping min/max key ranges across multiple files. Relies heavily on post-fact compaction or query time merges.
    {@link DistributionMode#HASH}Data is shuffled and consolidated by partition key. All records for a partition are routed to a single worker.Consolidates partition files, eliminating cross-worker file overlapping for partition keys. Excellent worker stability.Can suffer from severe data skew if a single partition contains significantly more data than others (hot partitions).
    {@link DistributionMode#RANGE}Data is shuffled based on a user-provided shard/bucket function (e.g., hashing/binning continuous keys).Distributes writes for hot partitions across multiple workers. Eliminates skew while keeping file min/max key ranges tight and non-overlapping.Requires providing a custom {@link SerializableFunction} mapping rows to integer shard/bucket IDs.
    + * + *

    Recommendation Matrix (Sorting & Partitioning vs. Scale):

    + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
    Recommendation Matrix
    PartitioningSortingScale / VolumeLatency PriorityRecommended ModeOperational Impact
    PartitionedSortedSmallAny{@link DistributionMode#HASH}Consolidates partition files and sorts them locally. Avoids file overlaps for small volumes.
    PartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Eliminates shuffle overhead for maximum write speed. Results in overlapping key ranges across files, which requires downstream compaction.
    PartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#HASH} with auto-sharding OR {@link DistributionMode#RANGE}HASH with auto-sharding scales writes for hot partitions but can result in overlapping file ranges requiring query-time sort merges. RANGE sharding distributes hot partitions into sequential, non-overlapping files to optimize reads.
    PartitionedUnsortedSmallAny{@link DistributionMode#HASH}Consolidates data files into single partition directories to prevent file fragmentation.
    PartitionedUnsortedMedium / LargeAny{@link DistributionMode#HASH} with auto-shardingConsolidates partition files while dynamically balancing hot partition writes across parallel workers.
    UnpartitionedSortedSmallAny{@link DistributionMode#NONE}Bypasses network shuffle for fast, low-volume local sorting.
    UnpartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Bypasses network shuffle for parallel worker writes. Requires downstream compaction to resolve overlapping file ranges.
    UnpartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#RANGE} (with custom sharding function)Shards continuous keys into non-overlapping worker ranges. Eliminates single-worker bottlenecks and guarantees zero file overlap for fast queries.
    UnpartitionedUnsortedAnyAny{@link DistributionMode#NONE}Direct, parallel worker writes with maximum throughput and zero network shuffle overhead.
    + * + *

    Code Samples:

    * - *
      - *
    1. {@link DistributionMode.NONE}: don't shuffle rows (default) - *
    2. {@link DistributionMode.HASH}: shuffle rows by partition key before writing data - *
    + *
    {@code
    +     * // 1. Using default HASH distribution mode (Consolidates by partition key)
    +     * pipeline
    +     *     .apply(Create.of(BEAM_ROWS))
    +     *     .apply(IcebergIO.writeRows(catalogConfig)
    +     *         .to(tableId));
          *
    -     * {@link DistributionMode.RANGE} is not supported yet
    +     * // 2. Using NONE distribution mode (No shuffle, local sorting only)
    +     * pipeline
    +     *     .apply(Create.of(BEAM_ROWS))
    +     *     .apply(IcebergIO.writeRows(catalogConfig)
    +     *         .to(tableId)
    +     *         .withDistributionMode(DistributionMode.NONE));
    +     *
    +     * // 3. Using RANGE distribution mode with a custom shard/bucket function to avoid data skew
    +     * pipeline
    +     *     .apply(Create.of(BEAM_ROWS))
    +     *     .apply(IcebergIO.writeRows(catalogConfig)
    +     *         .to(tableId)
    +     *         .withDistributionMode(DistributionMode.RANGE)
    +     *         .withDistributionFunction(row -> {
    +     *             // Group continuous IDs into 16 parallel, non-overlapping shards
    +     *             long id = row.getInt64("id");
    +     *             return (int) (id / 10000);
    +     *         }));
    +     * }
    */ public WriteRows withDistributionMode(DistributionMode mode) { return toBuilder().setDistributionMode(mode).build(); } + /** + * Sets the custom range-distribution function. + * + *

    Only applicable when the distribution mode is set to {@link DistributionMode#RANGE}. The + * function maps a Beam {@link Row} to an Integer representing a shard/bucket ID. + */ + public WriteRows withDistributionFunction(SerializableFunction shardFn) { + return toBuilder().setDistributionFunction(shardFn).build(); + } + + /** + * Enables Beam's dynamic auto-sharding when using {@link DistributionMode#HASH}. + * + *

    When enabled, the pipeline uses {@link + * org.apache.beam.sdk.transforms.GroupIntoBatches#withShardedKey()} under the hood. The runner + * (such as Dataflow) dynamically monitors throughput per partition key. If a partition is + * extremely hot, the runner automatically splits it into parallel sub-shards distributed across + * multiple workers to prevent single-worker bottlenecks and out-of-memory (OOM) errors, while + * keeping the number of data files for cold partitions minimal. + * + *

    Note that because auto-sharding distributes hot-partition data randomly across worker + * shards, the written data files cannot guarantee non-overlapping key ranges. Downstream + * queries may require read-time sort merges for overlapping file segments until an Iceberg + * compaction job (e.g., `rewriteDataFiles`) is executed. + * + *

    Only applicable when using {@link DistributionMode#HASH}. + */ public WriteRows withAutosharding() { return toBuilder().setAutoSharding(true).build(); } @@ -514,7 +693,30 @@ public IcebergWriteResult expand(PCollection input) { return input .apply( "AssignDestinationAndPartition", - new AssignDestinationsAndPartitions(destinations, getCatalogConfig())) + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getAutoSharding())); + case RANGE: + Preconditions.checkArgument( + getDistributionFunction() != null, + "Must provide a distribution function when using RANGE distribution mode."); + return input + .apply( + "AssignDestinationAndPartitionWithRange", + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) .apply( "Write Rows to Partitions", new WriteToPartitions( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java new file mode 100644 index 000000000000..6efc1bbe2eec --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.joda.time.ReadableInstant; + +/** + * A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link + * BufferedExternalSorter} to spill to local disk when elements exceed memory limit. + */ +class IcebergRowSorter implements Serializable { + + public static Iterable sortRows( + Iterable rows, + SortOrder sortOrder, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) { + + if (sortOrder == null || !sortOrder.isSorted()) { + return rows; + } + + BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options(); + BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions); + RowCoder rowCoder = RowCoder.of(beamSchema); + + List fields = sortOrder.fields(); + String[] columnNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = icebergSchema.findColumnName(fields.get(i).sourceId()); + } + + // Create reusable ByteArrayOutputStreams for key and value encoding + ByteArrayOutputStream keyBaos = new ByteArrayOutputStream(); + ByteArrayOutputStream valBaos = new ByteArrayOutputStream(); + + try { + for (Row row : rows) { + keyBaos.reset(); + valBaos.reset(); + encodeSortKey(row, sortOrder, columnNames, keyBaos, icebergSchema, beamSchema); + byte[] keyBytes = keyBaos.toByteArray(); + + rowCoder.encode(row, valBaos); + byte[] valBytes = valBaos.toByteArray(); + sorter.add(KV.of(keyBytes, valBytes)); + } + + Iterable> sortedKVs = sorter.sort(); + return new Iterable() { + @Override + public Iterator iterator() { + final Iterator> it = sortedKVs.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Row next() { + KV next = it.next(); + try { + ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue()); + return rowCoder.decode(bais); + } catch (IOException e) { + throw new RuntimeException("Failed to decode Row during sorting", e); + } + } + }; + } + }; + + } catch (IOException e) { + throw new RuntimeException("Failed to sort rows with external sorter", e); + } + } + + @SuppressWarnings("nullness") + public static void encodeSortKey( + Row row, + SortOrder sortOrder, + String[] columnNames, + ByteArrayOutputStream baos, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) + throws IOException { + + List fields = sortOrder.fields(); + + for (int i = 0; i < fields.size(); i++) { + SortField field = fields.get(i); + String colName = columnNames[i]; + Object val = row.getValue(colName); + + if (!field.transform().isIdentity()) { + Object icebergVal = + IcebergUtils.beamValueToIcebergValue(icebergSchema.findType(field.sourceId()), val); + if (icebergVal != null) { + val = field.transform().apply(icebergVal); + } else { + val = null; + } + } + + boolean isNull = (val == null); + boolean isDesc = (field.direction() == SortDirection.DESC); + boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST); + + // Determine correct header prefix to fulfill the NullOrder contracts + byte prefixByte; + if (isNull) { + prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF; + } else { + prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00; + } + + baos.write(prefixByte); + + if (!isNull) { + writeValue(val, baos, isDesc); + } + } + } + + private static void writeInt(int v, ByteArrayOutputStream baos, boolean invert) { + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + private static void writeLong(long v, ByteArrayOutputStream baos, boolean invert) { + byte b7 = (byte) (v >>> 56); + byte b6 = (byte) (v >>> 48); + byte b5 = (byte) (v >>> 40); + byte b4 = (byte) (v >>> 32); + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b7); + baos.write(~b6); + baos.write(~b5); + baos.write(~b4); + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b7); + baos.write(b6); + baos.write(b5); + baos.write(b4); + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + @SuppressWarnings("JavaUtilDate") + private static void writeValue(Object val, ByteArrayOutputStream baos, boolean invert) + throws IOException { + if (val instanceof String) { + writeString((String) val, baos, invert); + } else if (val instanceof Integer) { + int v = (Integer) val; + writeInt(v ^ Integer.MIN_VALUE, baos, invert); + } else if (val instanceof Long) { + long v = (Long) val; + writeLong(v ^ Long.MIN_VALUE, baos, invert); + } else if (val instanceof Float) { + int bits = Float.floatToIntBits((Float) val); + bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits; + writeInt(bits, baos, invert); + } else if (val instanceof Double) { + long bits = Double.doubleToLongBits((Double) val); + bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits; + writeLong(bits, baos, invert); + } else if (val instanceof Boolean) { + byte b = ((Boolean) val) ? (byte) 0x01 : (byte) 0x00; + baos.write(invert ? ~b : b); + } else if (val instanceof byte[]) { + writeByteArray((byte[]) val, baos, invert); + } else if (val instanceof ByteBuffer) { + writeByteArray(((ByteBuffer) val).array(), baos, invert); + } else if (val instanceof ReadableInstant) { + long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Instant) { + long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Date) { + long enc = ((Date) val).getTime() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else { + throw new UnsupportedOperationException( + "Unsupported type for sorting: " + val.getClass().getName()); + } + } + + private static void writeString(String s, ByteArrayOutputStream baos, boolean invert) + throws IOException { + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + writeByteArray(bytes, baos, invert); + } + + private static void writeByteArray(byte[] bytes, ByteArrayOutputStream baos, boolean invert) { + for (byte b : bytes) { + if (b == 0x00) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + } else if (b == 0x01) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x02 : (byte) 0x02); + } else { + baos.write(invert ? ~b : b); + } + } + baos.write(invert ? ~(byte) 0x00 : (byte) 0x00); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index d0d24532ff39..7f48a0d0128c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -296,6 +296,45 @@ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema s return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()])); } + /** + * Converts a Beam field value to its Iceberg-compatible equivalent based on the Iceberg {@link + * Type}. + */ + public static @Nullable Object beamValueToIcebergValue(Type type, @Nullable Object value) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIME: + case DECIMAL: + case STRING: + return value; + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + return getIcebergTimestampValue(value, ts.shouldAdjustToUTC()); + case UUID: + if (value instanceof byte[]) { + return UUID.nameUUIDFromBytes((byte[]) value); + } + return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + return value; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + default: + return value; + } + } + /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) { if (row.getSchema().getFieldCount() != schema.columns().size()) { @@ -351,7 +390,10 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC())); break; case STRING: - Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); + Object strVal = value.getValue(name); + if (strVal != null) { + rec.setField(name, strVal.toString()); + } break; case UUID: Optional.ofNullable(value.getBytes(name)) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 12d9570d4a38..95384dea1887 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -18,6 +18,10 @@ package org.apache.beam.sdk.io.iceberg; import java.util.List; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -30,6 +34,7 @@ import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -56,10 +61,17 @@ class WriteGroupedRowsToFiles @Override public PCollection expand( PCollection, Iterable>> input) { + Schema dataSchema = + ((RowCoder) + ((IterableCoder) + ((KvCoder, Iterable>) input.getCoder()) + .getValueCoder()) + .getElemCoder()) + .getSchema(); return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( - catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix, dataSchema))); } private static class WriteGroupedRowsToFilesDoFn @@ -70,16 +82,19 @@ private static class WriteGroupedRowsToFilesDoFn private transient @MonotonicNonNull Catalog catalog; private final String filePrefix; private final long maxFileSize; + private final Schema dataSchema; WriteGroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, long maxFileSize, - String filePrefix) { + String filePrefix, + Schema dataSchema) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; this.maxFileSize = maxFileSize; + this.dataSchema = dataSchema; } private org.apache.iceberg.catalog.Catalog getCatalog() { @@ -101,11 +116,16 @@ public void processElement( IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); WindowedValue windowedDestination = WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + RecordWriterManager writer; try (RecordWriterManager openWriter = new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) { writer = openWriter; - for (Row e : element.getValue()) { + Table table = openWriter.getOrCreateTable(destination, dataSchema); + Iterable sortedOrUnsortedRows = + IcebergRowSorter.sortRows( + element.getValue(), table.sortOrder(), table.schema(), dataSchema); + for (Row e : sortedOrUnsortedRows) { writer.write(windowedDestination, e); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 54ad120f1aca..0da0d4c5968c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -130,7 +131,10 @@ public void processElement( RecordWriter writer = new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); try { - for (Row row : element.getValue()) { + Iterable sortedOrUnsortedRows = + IcebergRowSorter.sortRows( + element.getValue(), table.sortOrder(), table.schema(), dataSchema); + for (Row row : sortedOrUnsortedRows) { Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); writer.write(record); } @@ -240,14 +244,23 @@ LastRefreshedTable getOrCreateTable(IcebergDestination destination, Schema dataS } catch (NoSuchTableException e) { // Otherwise, create the table org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + SortOrder sortOrder = + createConfig != null ? createConfig.getSortOrder() : SortOrder.unsorted(); try { - table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); + table = + catalog + .buildTable(identifier, tableSchema) + .withPartitionSpec(partitionSpec) + .withSortOrder(sortOrder) + .withProperties(tableProperties) + .create(); LOG.info( "Created Iceberg table '{}' with schema: {}\n" - + ", partition spec: {}, table properties: {}", + + ", partition spec: {}, sort order: {}, table properties: {}", identifier, tableSchema, partitionSpec, + sortOrder, tableProperties); } catch (AlreadyExistsException ignored) { // race condition: another worker already created this table diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 1db6ede30165..4d2dd1fe3d0d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -47,6 +47,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -238,11 +239,16 @@ public void processElement( WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); // Attempt to write record. If the writer is saturated and cannot accept - // the record, spill it over to WriteGroupedRowsToFiles - boolean writeSuccess; + // the record, or if the target table is sorted, spill it over to WriteGroupedRowsToFiles + boolean writeSuccess = false; try { - writeSuccess = - Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); + Table table = + Preconditions.checkNotNull(recordWriterManager) + .getOrCreateTable(destination, data.getSchema()); + if (!table.sortOrder().isSorted()) { + writeSuccess = + Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); + } } catch (Exception e) { try { Preconditions.checkNotNull(recordWriterManager).close(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 52d92911f4e4..294e66be6956 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -77,6 +77,7 @@ import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -774,4 +775,136 @@ public void process(@Element KV>> sums) { .getCommitted(); assertEquals(5L, numWaves); } + + @Test + public void testRangeDistribution() { + assumeTrue(distributionMode.equals(HASH_WITH_AUTOSHARDING)); + + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + + TableIdentifier tableId = + TableIdentifier.of("default", "range_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + catalog.catalog().createTable(tableId, icebergSchema, PartitionSpec.unpartitioned()); + + PCollection rows = + testPipeline + .apply(GenerateSequence.from(0).to(100)) + .apply( + "Make rows", + MapElements.into(TypeDescriptors.rows()) + .via(i -> Row.withSchema(schema).addValues(i, "name_" + i).build())) + .setRowSchema(schema); + + rows.apply( + "range distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withDistributionMode(DistributionMode.RANGE) + .withDistributionFunction(row -> (int) (row.getInt64("id") % 5))); + + testPipeline.run().waitUntilFinish(); + + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertEquals(100, writtenRecords.size()); + } + + @Test + public void testSortedWrite() { + TableIdentifier tableId = + TableIdentifier.of("default", "sorted_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + + catalog + .catalog() + .buildTable(tableId, icebergSchema) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .withSortOrder(SortOrder.builderFor(icebergSchema).asc("name").desc("id").build()) + .create(); + + List inputRows = + Arrays.asList( + Row.withSchema(schema).addValues(2L, "banana").build(), + Row.withSchema(schema).addValues(1L, "banana").build(), + Row.withSchema(schema).addValues(5L, "apple").build(), + Row.withSchema(schema).addValues(10L, "cherry").build()); + + testPipeline + .apply("Scrambled Input", Create.of(inputRows)) + .setRowSchema(schema) + .apply("Append Sorted To Table", writeTransform(catalog, tableId)); + + testPipeline.run().waitUntilFinish(); + + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + + assertEquals(4, writtenRecords.size()); + + try { + assertFilesAreInternallySorted(table); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void assertFilesAreInternallySorted(Table table) throws Exception { + for (org.apache.iceberg.FileScanTask task : table.newScan().planFiles()) { + String path = task.file().path().toString(); + try (org.apache.iceberg.io.CloseableIterable reader = + org.apache.iceberg.parquet.Parquet.read(table.io().newInputFile(path)) + .project(table.schema()) + .createReaderFunc(org.apache.iceberg.data.parquet.GenericParquetReaders::buildReader) + .build()) { + List records = + org.apache.commons.compress.utils.Lists.newArrayList(reader.iterator()); + assertTrue("File must have at least one record", records.size() > 0); + + for (int i = 1; i < records.size(); i++) { + Record prev = records.get(i - 1); + Record curr = records.get(i); + + String prevName = (String) prev.getField("name"); + String currName = (String) curr.getField("name"); + + int cmpName = prevName.compareTo(currName); + if (cmpName > 0) { + throw new AssertionError("File not sorted by name ASC: " + prevName + " > " + currName); + } else if (cmpName == 0) { + long prevId = (Long) prev.getField("id"); + long currId = (Long) curr.getField("id"); + if (prevId < currId) { + throw new AssertionError("File not sorted by id DESC: " + prevId + " < " + currId); + } + } + } + } + } + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java new file mode 100644 index 000000000000..6eb26e6de4b5 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.types.Types; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class IcebergRowSorterTest { + + private static final Schema BEAM_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("value", Schema.FieldType.DOUBLE) + .addNullableField("active", Schema.FieldType.BOOLEAN) + .build(); + + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.DoubleType.get()), + Types.NestedField.optional(4, "active", Types.BooleanType.get())); + + private static final Comparator BYTE_ARR_COMPARATOR = + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedBytes + .lexicographicalComparator(); + + private static byte[] encodeSortKeyHelper(Row row, SortOrder sortOrder) throws Exception { + java.util.List fields = sortOrder.fields(); + String[] columnNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = ICEBERG_SCHEMA.findColumnName(fields.get(i).sourceId()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IcebergRowSorter.encodeSortKey(row, sortOrder, columnNames, baos, ICEBERG_SCHEMA, BEAM_SCHEMA); + return baos.toByteArray(); + } + + @Test + public void testStringKeyEncodingOrder() throws Exception { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "apple", 1.5, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "banana", 2.0, true).build(); + Row r3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "apricot", 3.0, false).build(); + + byte[] k1 = encodeSortKeyHelper(r1, sortOrder); + byte[] k2 = encodeSortKeyHelper(r2, sortOrder); + byte[] k3 = encodeSortKeyHelper(r3, sortOrder); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k2) < 0); // apple < banana + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k3) < 0); // apple < apricot + assertTrue(BYTE_ARR_COMPARATOR.compare(k3, k2) < 0); // apricot < banana + } + + @Test + public void testStringCollisionProofing() throws Exception { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").asc("value").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "abc", 1.0, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcdef", null, true).build(); + + byte[] k1 = encodeSortKeyHelper(r1, sortOrder); + byte[] k2 = encodeSortKeyHelper(r2, sortOrder); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k2) < 0); + } + + @Test + public void testDescInversion() throws Exception { + SortOrder sortOrderAsc = SortOrder.builderFor(ICEBERG_SCHEMA).asc("id").build(); + SortOrder sortOrderDesc = SortOrder.builderFor(ICEBERG_SCHEMA).desc("id").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(10, "test", 1.5, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(20, "test", 2.0, true).build(); + + byte[] k1Asc = encodeSortKeyHelper(r1, sortOrderAsc); + byte[] k2Asc = encodeSortKeyHelper(r2, sortOrderAsc); + + byte[] k1Desc = encodeSortKeyHelper(r1, sortOrderDesc); + byte[] k2Desc = encodeSortKeyHelper(r2, sortOrderDesc); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1Asc, k2Asc) < 0); + assertTrue(BYTE_ARR_COMPARATOR.compare(k1Desc, k2Desc) > 0); + } + + @Test + public void testNullOrderingMatrix() throws Exception { + Row rNonNull = Row.withSchema(BEAM_SCHEMA).addValues(1, "apple", 1.5, true).build(); + Row rNull = Row.withSchema(BEAM_SCHEMA).addValues(2, null, 2.0, true).build(); + + // 1. ASC, NULLS_FIRST + SortOrder ascFirst = + SortOrder.builderFor(ICEBERG_SCHEMA).asc("name", NullOrder.NULLS_FIRST).build(); + byte[] kNonNullAscFirst = encodeSortKeyHelper(rNonNull, ascFirst); + byte[] kNullAscFirst = encodeSortKeyHelper(rNull, ascFirst); + assertTrue( + "ASC NULLS_FIRST failed: null should sort before non-null", + BYTE_ARR_COMPARATOR.compare(kNullAscFirst, kNonNullAscFirst) < 0); + + // 2. ASC, NULLS_LAST + SortOrder ascLast = + SortOrder.builderFor(ICEBERG_SCHEMA).asc("name", NullOrder.NULLS_LAST).build(); + byte[] kNonNullAscLast = encodeSortKeyHelper(rNonNull, ascLast); + byte[] kNullAscLast = encodeSortKeyHelper(rNull, ascLast); + assertTrue( + "ASC NULLS_LAST failed: null should sort after non-null", + BYTE_ARR_COMPARATOR.compare(kNullAscLast, kNonNullAscLast) > 0); + + // 3. DESC, NULLS_FIRST + SortOrder descFirst = + SortOrder.builderFor(ICEBERG_SCHEMA).desc("name", NullOrder.NULLS_FIRST).build(); + byte[] kNonNullDescFirst = encodeSortKeyHelper(rNonNull, descFirst); + byte[] kNullDescFirst = encodeSortKeyHelper(rNull, descFirst); + assertTrue( + "DESC NULLS_FIRST failed: null should sort before non-null", + BYTE_ARR_COMPARATOR.compare(kNullDescFirst, kNonNullDescFirst) < 0); + + // 4. DESC, NULLS_LAST + SortOrder descLast = + SortOrder.builderFor(ICEBERG_SCHEMA).desc("name", NullOrder.NULLS_LAST).build(); + byte[] kNonNullDescLast = encodeSortKeyHelper(rNonNull, descLast); + byte[] kNullDescLast = encodeSortKeyHelper(rNull, descLast); + assertTrue( + "DESC NULLS_LAST failed: null should sort after non-null", + BYTE_ARR_COMPARATOR.compare(kNullDescLast, kNonNullDescLast) > 0); + } + + @Test + public void testEndToEndSorting() { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").desc("id").build(); + + List input = + Arrays.asList( + Row.withSchema(BEAM_SCHEMA).addValues(2, "banana", 2.0, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(1, "banana", 1.0, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(5, "apple", 1.5, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(10, "cherry", 3.0, false).build()); + + Iterable sorted = IcebergRowSorter.sortRows(input, sortOrder, ICEBERG_SCHEMA, BEAM_SCHEMA); + List sortedList = + StreamSupport.stream(sorted.spliterator(), false).collect(Collectors.toList()); + + assertEquals(4, sortedList.size()); + + assertEquals("apple", sortedList.get(0).getString("name")); + assertEquals(Integer.valueOf(5), sortedList.get(0).getInt32("id")); + + assertEquals("banana", sortedList.get(1).getString("name")); + assertEquals(Integer.valueOf(2), sortedList.get(1).getInt32("id")); + + assertEquals("banana", sortedList.get(2).getString("name")); + assertEquals(Integer.valueOf(1), sortedList.get(2).getInt32("id")); + + assertEquals("cherry", sortedList.get(3).getString("name")); + assertEquals(Integer.valueOf(10), sortedList.get(3).getInt32("id")); + } + + @Test + public void testScaleAndExternalDiskSpill() { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("id").build(); + + int count = 5000; + List input = new ArrayList<>(count); + Random rand = new Random(42); + + for (int i = 0; i < count; i++) { + int randomId = rand.nextInt(100_000); + input.add(Row.withSchema(BEAM_SCHEMA).addValues(randomId, "item" + i, 1.0, true).build()); + } + + Iterable sorted = IcebergRowSorter.sortRows(input, sortOrder, ICEBERG_SCHEMA, BEAM_SCHEMA); + List sortedList = + StreamSupport.stream(sorted.spliterator(), false).collect(Collectors.toList()); + + assertEquals(count, sortedList.size()); + + for (int i = 0; i < sortedList.size() - 1; i++) { + int idCurrent = sortedList.get(i).getInt32("id"); + int idNext = sortedList.get(i + 1).getInt32("id"); + assertTrue( + String.format("Sort violation at index %d: %d > %d", i, idCurrent, idNext), + idCurrent <= idNext); + } + } + + @Test + public void testUnsupportedComplexTypeSorting() { + org.apache.iceberg.Schema mapSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional( + 2, + "attributes", + Types.MapType.ofOptional(3, 4, Types.StringType.get(), Types.StringType.get()))); + + assertThrows( + org.apache.iceberg.exceptions.ValidationException.class, + () -> SortOrder.builderFor(mapSchema).asc("attributes").build()); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index c9026522dba3..d6b2bf11370e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -269,6 +269,42 @@ public void testMapOfRecords() { IcebergUtils.beamRowToIcebergRecord(RECORD_MAP_ICEBERG_SCHEMA, ROW_MAP_OF_ROWS); assertEquals(RECORD_MAP_OF_RECORDS, actual); } + + @Test + public void testBigDecimalToStringConversion() { + BigDecimal num = new BigDecimal("987654321.123456789"); + checkRowValueToRecordValue( + Schema.FieldType.DECIMAL, num, Types.StringType.get(), "987654321.123456789"); + } + + @Test + public void testIntegerToStringConversion() { + checkRowValueToRecordValue(Schema.FieldType.INT32, 42, Types.StringType.get(), "42"); + } + + @Test + public void testDoubleToStringConversion() { + checkRowValueToRecordValue( + Schema.FieldType.DOUBLE, 3.14159, Types.StringType.get(), "3.14159"); + } + + @Test + public void testBooleanToStringConversion() { + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, true, Types.StringType.get(), "true"); + } + + @Test + public void testNullStringConversion() { + Schema beamSchema = + Schema.of(Schema.Field.of("v", Schema.FieldType.STRING).withNullable(true)); + Row row = Row.withSchema(beamSchema).addValue(null).build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(optional(0, "v", Types.StringType.get())); + Record record = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row); + + assertEquals(null, record.getField("v")); + } } @RunWith(JUnit4.class)