From 0c3acc0ce6a0009d4c50872b49809d9e91ba93e2 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Mon, 1 Jun 2026 11:02:06 +0530 Subject: [PATCH 1/4] Fix duplicate filtering path in Arrow task batches --- pyiceberg/io/pyarrow.py | 13 ++++------ tests/io/test_pyarrow.py | 53 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..e527682d14 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1681,17 +1681,14 @@ def _task_to_record_batches( if current_batch.num_rows == 0: continue - # Apply the user filter - if pyarrow_filter is not None: - # Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) - table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) + # Apply the user filter only when positional deletes are present. + # In the default case, the filter is already pushed down via Scanner.from_fragment. + if pyarrow_filter is not None and positional_deletes: + current_batch = current_batch.filter(pyarrow_filter) # skip empty batches - if table.num_rows == 0: + if current_batch.num_rows == 0: continue - current_batch = table.combine_chunks().to_batches()[0] - yield _to_requested_schema( projected_schema, file_project_schema, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2f36661a1f..715424263f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3267,6 +3267,59 @@ def _expected_batch(unit: str) -> pa.RecordBatch: assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) +def test_task_to_record_batches_filter_without_positional_deletes_avoids_table_refilter(tmpdir: str) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_task_to_record_batches_filter_no_positional.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + from pyiceberg.expressions.visitors import bind + + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 1), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=None, + case_sensitive=True, + ) + ) + assert len(result_batches) == 1 + assert result_batches[0].column(0).to_pylist() == [2, 3] + + +def test_task_to_record_batches_filter_with_positional_deletes_handles_empty_batch(tmpdir: str) -> None: + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_task_to_record_batches_filter_with_positional.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + from pyiceberg.expressions.visitors import bind + + positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 100), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert result_batches == [] + + def test_parse_location_defaults() -> None: """Test that parse_location uses defaults.""" From f10845b197db0441175d43477547adf9e18baba6 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Mon, 1 Jun 2026 23:09:34 +0530 Subject: [PATCH 2/4] tests: add regression tests for #3272 SIGSEGV fix Replace the two weak tests that passed on the unfixed code with proper regression coverage: 1. test_task_to_record_batches_does_not_use_table_filter_without_positional_deletes Replaces pyarrow.Table with a sentinel class whose from_batches raises AssertionError. A custom metaclass preserves isinstance checks so the rest of the code-path is unaffected. With the fix the sentinel is never reached; without the fix it fires immediately, detecting the old pa.Table.from_batches / filter / combine_chunks / to_batches[0] path that caused the SIGSEGV on Apple Silicon. 2. test_task_to_record_batches_filter_applied_after_positional_deletes Uses data [1,2,3,4,5] with positional deletes on positions 1 and 3 (removing values 2 and 4), then applies filter id > 2. Expected result [3, 5] would be wrong if either deletes or the subsequent filter were skipped. --- tests/io/test_pyarrow.py | 92 +++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 715424263f..fcf30ab14f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3267,48 +3267,99 @@ def _expected_batch(unit: str) -> pa.RecordBatch: assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) -def test_task_to_record_batches_filter_without_positional_deletes_avoids_table_refilter(tmpdir: str) -> None: +def test_task_to_record_batches_does_not_use_table_filter_without_positional_deletes(tmpdir: str) -> None: + """Regression test for https://github.com/apache/iceberg-python/issues/3272. + + The old code always created a ``pa.Table`` via ``pa.Table.from_batches`` and called + ``filter`` / ``combine_chunks`` / ``to_batches()[0]`` — even when no positional deletes + existed and the scanner had already applied the predicate push-down. On Apple Silicon + with affected PyArrow versions this triggered a SIGSEGV. The fix removes that + Table-based code path for the no-positional-delete case. + + We detect a regression by replacing ``pyarrow.Table`` with a sentinel class whose + ``from_batches`` raises immediately. ``isinstance`` checks against the original + ``pa.Table`` are preserved via a custom metaclass so the rest of the code-path is + unaffected. With the fix the sentinel is never reached; without the fix it is called + and the test fails. + """ + from pyiceberg.expressions.visitors import bind + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) data_file = _write_table_to_data_file( - f"{tmpdir}/test_task_to_record_batches_filter_no_positional.parquet", arrow_schema, arrow_table + f"{tmpdir}/test_task_to_record_batches_no_table_refilter.parquet", arrow_schema, arrow_table ) table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) - from pyiceberg.expressions.visitors import bind - result_batches = list( - _task_to_record_batches( - PyArrowFileIO(), - FileScanTask(data_file), - bound_row_filter=bind(table_schema, GreaterThan("id", 1), case_sensitive=True), - projected_schema=table_schema, - table_schema=table_schema, - projected_field_ids={1}, - positional_deletes=None, - case_sensitive=True, + _real_Table = pa.Table + + class _TableFilterSentinelMeta(type): + """Metaclass that delegates isinstance/issubclass to the real pa.Table.""" + + def __instancecheck__(cls, instance: object) -> bool: + return isinstance(instance, _real_Table) + + def __subclasscheck__(cls, subclass: type) -> bool: + return issubclass(subclass, _real_Table) + + class _TableFilterSentinel(metaclass=_TableFilterSentinelMeta): + @staticmethod + def from_batches(*_: object, **__: object) -> None: + raise AssertionError( + "pa.Table.from_batches must not be called when positional_deletes is None: " + "the SIGSEGV-prone workaround path (from_batches/filter/combine_chunks/to_batches) was taken" + ) + + with patch("pyarrow.Table", new=_TableFilterSentinel): + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 1), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=None, + case_sensitive=True, + ) ) - ) assert len(result_batches) == 1 assert result_batches[0].column(0).to_pylist() == [2, 3] -def test_task_to_record_batches_filter_with_positional_deletes_handles_empty_batch(tmpdir: str) -> None: +def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: str) -> None: + """Regression test: the row filter must be applied *after* positional deletes are removed. + + When positional deletes are present the scanner does not push down the predicate, so + ``_task_to_record_batches`` must apply ``pyarrow_filter`` explicitly after ``take``. + This test uses data where the expected result differs from both + "filter only" and "deletes only" projections, ensuring that skipping either step + would produce the wrong answer. + """ + from pyiceberg.expressions.visitors import bind + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) - arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + # File rows (0-indexed positions): 0→1, 1→2, 2→3, 3→4, 4→5 + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) data_file = _write_table_to_data_file( f"{tmpdir}/test_task_to_record_batches_filter_with_positional.parquet", arrow_schema, arrow_table ) table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) - from pyiceberg.expressions.visitors import bind - positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])] + # Delete file-positions 1 and 3 (values 2 and 4); survivors: [1, 3, 5] + # Then apply filter id >= 3; expected result: [3, 5] + # + # Wrong results that would indicate a bug: + # [1, 3, 5] — filter not applied after deletes + # [3, 4, 5] — positional deletes not applied (scanner skips filter push-down) + positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])] result_batches = list( _task_to_record_batches( PyArrowFileIO(), FileScanTask(data_file), - bound_row_filter=bind(table_schema, GreaterThan("id", 100), case_sensitive=True), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), projected_schema=table_schema, table_schema=table_schema, projected_field_ids={1}, @@ -3317,7 +3368,8 @@ def test_task_to_record_batches_filter_with_positional_deletes_handles_empty_bat ) ) - assert result_batches == [] + assert len(result_batches) == 1 + assert result_batches[0].column(0).to_pylist() == [3, 5] def test_parse_location_defaults() -> None: From 49faf85a8315c59d8683465949655e4a70f94d76 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Tue, 2 Jun 2026 20:20:13 +0530 Subject: [PATCH 3/4] Fix duplicate filtering path in Arrow task batches - Combine the two positional-delete handling blocks into one: apply take(indices) and the row filter together inside the single 'if positional_deletes' block, removing the redundant mid-loop empty-batch check (filtering an empty batch is free). - Replace the sentinel-based regression test (which passed even when the fix was reverted) with a behavioral test that picks a scenario where the old bug produces a distinct wrong answer: data=[1,2,3,4], pos_delete=2 (value 3), filter=id>2 correct: [4] old bug (scanner pre-filters): [3,4] The assertion on [4] will fail against any regression. Addresses review feedback from @ebyhr and @Fokko. --- pyiceberg/io/pyarrow.py | 10 ++---- tests/io/test_pyarrow.py | 76 ++++++++++++++++------------------------ 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e527682d14..ada65b4f58 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1676,19 +1676,13 @@ def _task_to_record_batches( # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) + if pyarrow_filter is not None: + current_batch = current_batch.filter(pyarrow_filter) # skip empty batches if current_batch.num_rows == 0: continue - # Apply the user filter only when positional deletes are present. - # In the default case, the filter is already pushed down via Scanner.from_fragment. - if pyarrow_filter is not None and positional_deletes: - current_batch = current_batch.filter(pyarrow_filter) - # skip empty batches - if current_batch.num_rows == 0: - continue - yield _to_requested_schema( projected_schema, file_project_schema, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index fcf30ab14f..fbb3fdc097 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3267,65 +3267,49 @@ def _expected_batch(unit: str) -> pa.RecordBatch: assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) -def test_task_to_record_batches_does_not_use_table_filter_without_positional_deletes(tmpdir: str) -> None: +def test_task_to_record_batches_scanner_filter_not_set_with_positional_deletes(tmpdir: str) -> None: """Regression test for https://github.com/apache/iceberg-python/issues/3272. - The old code always created a ``pa.Table`` via ``pa.Table.from_batches`` and called - ``filter`` / ``combine_chunks`` / ``to_batches()[0]`` — even when no positional deletes - existed and the scanner had already applied the predicate push-down. On Apple Silicon - with affected PyArrow versions this triggered a SIGSEGV. The fix removes that - Table-based code path for the no-positional-delete case. - - We detect a regression by replacing ``pyarrow.Table`` with a sentinel class whose - ``from_batches`` raises immediately. ``isinstance`` checks against the original - ``pa.Table`` are preserved via a custom metaclass so the rest of the code-path is - unaffected. With the fix the sentinel is never reached; without the fix it is called - and the test fails. + When positional deletes are present the scanner must NOT receive the row filter as a + push-down predicate. Positional-delete indices reference absolute row positions in the + original file; if the scanner filters rows first the surviving rows shift and the + indices no longer map correctly, producing silently wrong results. + + The test chooses data where the old (buggy) code path gives a distinct wrong answer: + - File rows (positions 0-3): [1, 2, 3, 4] + - Positional delete: position 2 → removes value 3 → survivors [1, 2, 4] + - Row filter: id > 2 → expected result [4] + + Old bug (scanner pre-filters id > 2 → [3, 4], then _combine_positional_deletes sees only + 2 rows so absolute position 2 is outside the batch range and nothing is deleted → [3, 4]). """ from pyiceberg.expressions.visitors import bind arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) - arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + # File row positions: 0→1, 1→2, 2→3, 3→4 + arrow_table = pa.table([pa.array([1, 2, 3, 4], type=pa.int32())], schema=arrow_schema) data_file = _write_table_to_data_file( - f"{tmpdir}/test_task_to_record_batches_no_table_refilter.parquet", arrow_schema, arrow_table + f"{tmpdir}/test_scanner_filter_not_set_with_pos_deletes.parquet", arrow_schema, arrow_table ) table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) - _real_Table = pa.Table - - class _TableFilterSentinelMeta(type): - """Metaclass that delegates isinstance/issubclass to the real pa.Table.""" - - def __instancecheck__(cls, instance: object) -> bool: - return isinstance(instance, _real_Table) - - def __subclasscheck__(cls, subclass: type) -> bool: - return issubclass(subclass, _real_Table) - - class _TableFilterSentinel(metaclass=_TableFilterSentinelMeta): - @staticmethod - def from_batches(*_: object, **__: object) -> None: - raise AssertionError( - "pa.Table.from_batches must not be called when positional_deletes is None: " - "the SIGSEGV-prone workaround path (from_batches/filter/combine_chunks/to_batches) was taken" - ) - - with patch("pyarrow.Table", new=_TableFilterSentinel): - result_batches = list( - _task_to_record_batches( - PyArrowFileIO(), - FileScanTask(data_file), - bound_row_filter=bind(table_schema, GreaterThan("id", 1), case_sensitive=True), - projected_schema=table_schema, - table_schema=table_schema, - projected_field_ids={1}, - positional_deletes=None, - case_sensitive=True, - ) + positional_deletes = [pa.chunked_array([pa.array([2], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, ) + ) + assert len(result_batches) == 1 - assert result_batches[0].column(0).to_pylist() == [2, 3] + assert result_batches[0].column(0).to_pylist() == [4] def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: str) -> None: From 27eb31bd54148b71d622a3d1b0e0bcfc6e9fe7ea Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 11:36:28 +0530 Subject: [PATCH 4/4] fix: handle empty result from RecordBatch.filter on pyarrow < 21 PyArrow < 21 raises IndexError when RecordBatch.filter(Expression) produces zero rows. Wrap the call in try/except and fall back to an empty slice, which is already handled by the num_rows == 0 guard below. Add regression test covering the positional-delete path where the post-delete filter eliminates all remaining rows. Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/io/pyarrow.py | 7 ++++++- tests/io/test_pyarrow.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ada65b4f58..992b3c1db0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1677,7 +1677,12 @@ def _task_to_record_batches( indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) if pyarrow_filter is not None: - current_batch = current_batch.filter(pyarrow_filter) + try: + current_batch = current_batch.filter(pyarrow_filter) + except IndexError: + # PyArrow < 21 raises IndexError when filter produces zero rows + # (fixed in https://github.com/apache/arrow/pull/46057) + current_batch = current_batch.slice(0, 0) # skip empty batches if current_batch.num_rows == 0: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index fbb3fdc097..b1bd622883 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3356,6 +3356,41 @@ def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: assert result_batches[0].column(0).to_pylist() == [3, 5] +def test_task_to_record_batches_filter_after_positional_deletes_empty_result(tmpdir: str) -> None: + """Regression: filter after positional deletes must not raise even when the result is empty. + + PyArrow < 21 raises IndexError from RecordBatch.filter(Expression) when the result has + zero rows (fixed in https://github.com/apache/arrow/pull/46057). This test ensures the + positional-delete path handles that case gracefully and yields no batches. + """ + from pyiceberg.expressions.visitors import bind + + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_filter_after_positional_deletes_empty_result.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + + # No rows deleted, but filter (id > 10) eliminates all rows → must return empty + positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 10), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert result_batches == [] + + def test_parse_location_defaults() -> None: """Test that parse_location uses defaults."""