Skip to content

Implement high-performance sorted writing support in IcebergIO#38406

Draft
atognolag wants to merge 8 commits intoapache:masterfrom
atognolag:feature/sorted-icebergIO
Draft

Implement high-performance sorted writing support in IcebergIO#38406
atognolag wants to merge 8 commits intoapache:masterfrom
atognolag:feature/sorted-icebergIO

Conversation

@atognolag
Copy link
Copy Markdown
Contributor

Pull Request: High-Performance Sorted Writing Support in Native IcebergIO

Description

This pull request implements robust, high-performance sorted writing support in native IcebergIO (sdks/java/io/iceberg).

When writing to sorted Iceberg tables, writing unsorted data causes massive performance degradation, high memory overhead, and "file thrashing" due to too many concurrent file writers being kept open on workers. By dynamically pre-sorting incoming PCollection<Row> elements based on the active Iceberg table SortOrder inside the write transform, we produce perfectly ordered Parquet files, optimize worker resources, and reduce the number of concurrent file handles.


Core Technical Implementation

1. Memory-Safe Spill-to-Disk Sorter (IcebergRowSorter.java)

  • Integrates Beam's native :sdks:java:extensions:sorter (via BufferedExternalSorter).
  • Processes row sorting per-partition dynamically. If memory buffers are exceeded, the sorter automatically spills elements to the worker's local disk, ensuring strict memory guardrails against out-of-memory (OOM) crashes.

2. Dynamic Unsigned Lexicographical Byte Encoding

  • Generates a sorting key dynamically by converting composite sort columns into a lexicographically comparable byte array (byte[]):
    • Direction (ASC vs DESC): ASC columns are encoded in their natural comparable format. DESC columns are inverted bitwise (~byte) to reverse the unsigned byte comparison order naturally.
    • Null Constraints (NULLS_FIRST & NULLS_LAST): Prefix headers are mapped statically to direct standard unsigned comparators correctly:
      • ASC NULLS_FIRST -> 0x00 / ASC NULLS_LAST -> 0xFF
      • DESC NULLS_FIRST -> 0xFF / DESC NULLS_LAST -> 0x00
    • Escape Boundary Protocol: Strings and byte arrays are mapped to a deterministic, collision-free escaping sequence (0x00 -> [0x01, 0x01], 0x01 -> [0x01, 0x02]) terminated by a safe 0x00 byte. This prevents column boundary bleeding on composite keys (e.g. "abc"+"def" vs "abcdef"+null).
    • Algebraic Numbers: Transforms signed integers, longs, doubles, and floats into big-endian byte structures with flipped sign bits to preserve algebraic scale order.
    • Flexible Timeframes: Supports ReadableInstant, java.time.Instant, and java.util.Date conversions, preventing runner-specific casting crashes.

3. Shard-Routing and Write-Path Bypasses

  • Dynamic Schema Extraction: Extracts schemas dynamically from active row elements at runtime, avoiding null schema references caused by transient serialization on worker nodes.
  • Direct-Write Bypass: Dynamically inspects target table metadata inside WriteUngroupedRowsToFiles. If the table has an active SortOrder, it skips direct ungrouped writing and spills elements to the grouped, shuffled path where they are properly partitioned and pre-sorted before writing.

Verification and Test Coverage

1. Expanded Unit Tests (IcebergRowSorterTest.java)

  • String Collision Proofing: Asserts that boundary-safe byte escaping correctly distinguishes and orders complex composite strings.
  • Null Quadrant Matrix: Validates all 4 null-ordering combinations under unsigned comparison.
  • Scale and Disk Spill: Asserts sorting safety and data integrity with 5,000 randomized records under extremely tight memory bounds (1MB buffer), forcing disk spilling.

2. End-to-End Pipeline Integration Tests (IcebergIOWriteTest.java)

  • Appends scrambled datasets to a sorted Iceberg table in a Beam pipeline.
  • Implements a sharding-friendly assertFilesAreInternallySorted verification helper which parses individual Parquet files committed to the table directly using Iceberg scan APIs, ensuring that each written file is perfectly sorted internally, regardless of the runner's sharding factor.
  • Tested and validated successfully across NONE, HASH, and HASH_WITH_AUTOSHARDING distribution modes.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces native support for sorted writes in IcebergIO. By integrating a memory-safe, disk-spilling sorter, the implementation ensures that data written to sorted Iceberg tables is perfectly ordered, which significantly improves performance and reduces resource contention by minimizing concurrent file writers. The changes include a comprehensive byte-encoding protocol for lexicographical sorting and updated write-path logic to handle sorted tables efficiently.

Highlights

  • Sorted Writing Support: Implemented high-performance sorted writing in IcebergIO by dynamically pre-sorting incoming rows based on the target table's SortOrder.
  • Memory-Safe Sorter: Introduced IcebergRowSorter, which leverages Beam's BufferedExternalSorter to spill data to disk when memory limits are exceeded, preventing OOM errors.
  • Lexicographical Encoding: Added a robust byte-encoding mechanism that supports complex composite keys, null-ordering contracts, and type-specific sorting (including algebraic number handling).
  • Write Path Optimization: Updated WriteUngroupedRowsToFiles and WriteGroupedRowsToFiles to automatically detect table sort requirements and route data through the sorting path.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@atognolag atognolag marked this pull request as draft May 7, 2026 15:33
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements row sorting for Iceberg writes by introducing the IcebergRowSorter utility, which leverages BufferedExternalSorter to handle large datasets. The implementation supports complex Iceberg SortOrder configurations by encoding sort keys into byte arrays. Review feedback identified critical logic errors in the null-ordering byte prefix calculation for descending columns, which were also propagated into incorrect test assertions. Additionally, several performance optimizations were suggested to avoid expensive row conversions and redundant lookups within loops, and to reduce GC pressure from frequent buffer allocations. A potential reliability issue was also noted regarding the consumption of non-re-iterable collections when extracting schemas.

@atognolag
Copy link
Copy Markdown
Contributor Author

@ahmedabu98 mind taking a look? Thanks!

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

I think more thought needs to be put into the architecture of Sorted writes. This is a primitive approach that might work well for small writes, but will quickly degrade for medium~larger writes.

Maybe take a look at how Flink or Spark do it (look for "range" distribution mode). IIRC they do a global sort before passing data to writers. This way, input iterables are already sorted so writers can break them up into files with tight min/max ranges.

Comment on lines +125 to +130
if (icebergRecord == null) {
icebergRecord = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row);
}
Object icebergVal = icebergRecord.getField(colName);
if (icebergVal != null) {
val = field.transform().apply(icebergVal);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty heavy to extract the transformed value. Will make writes pretty slow

Can we apply the transform to the Beam field object directly? If not, maybe we need a "beam object to iceberg object" conversion. IcebergUtils.copyFieldIntoRecord does something similar, but we may need to refactor it a little to fit this use case.

try {
for (Row row : rows) {
byte[] keyBytes = encodeSortKey(row, sortOrder, columnNames, icebergSchema, beamSchema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's to clear and reuse the output stream object instead of creating a new one each iteration

org.apache.beam.sdk.schemas.Schema beamSchema)
throws IOException {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, let's create ByteArrayOutputStream it in the caller function and pass it in here. Each call should reset and use the same one for efficiency

Comment on lines +241 to +243
} else {
writeString(val.toString(), baos, invert);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg support sorting on complex types (lists, maps, structs) ?

We should either 1) add support for that or 2) throw an Unsupported Exception early on.

I'd rather throw an error than fall back on String because it may lead to unexpected sorting behavior for some types.

Comment on lines +133 to +136
Iterable<Row> sortedOrUnsortedRows =
IcebergRowSorter.sortRows(
element.getValue(), table.sortOrder(), table.schema(), dataSchema);
for (Row row : sortedOrUnsortedRows) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only be sorting if the user asked us to.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants