Skip to content

Commit ff94713

Browse files
authored
Merge branch 'apache:master' into auron-1996
2 parents e1459f1 + c0c4a0b commit ff94713

58 files changed

Lines changed: 6299 additions & 55 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/iceberg.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
8585
- name: Upload reports
8686
if: failure()
87-
uses: actions/upload-artifact@v6
87+
uses: actions/upload-artifact@v7
8888
with:
8989
name: auron-iceberg-${{ matrix.sparkver }}-jdk${{ matrix.javaver }}-test-report
9090
path: thirdparty/auron-iceberg/target/surefire-reports

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ log = "0.4.29"
174174
lz4_flex = "0.12.0"
175175
num = "0.4.2"
176176
object_store = "0.12.4"
177-
once_cell = "1.21.3"
177+
once_cell = "1.21.4"
178178
panic-message = "0.3.0"
179179
parking_lot = "0.12.5"
180180
paste = "1.0.15"
@@ -186,7 +186,7 @@ rand = "0.9.2"
186186
smallvec = "2.0.0-alpha.11"
187187
sonic-rs = "0.5.7"
188188
tempfile = "3"
189-
tokio = "1.49.0"
189+
tokio = "1.50.0"
190190
tonic-build = "0.13.1"
191191
transpose = "0.2.3"
192192
unchecked-index = "0.2.2"
@@ -204,7 +204,7 @@ datafusion-execution = { git = "https://github.com/auron-project/datafusion.git"
204204
datafusion-optimizer = { git = "https://github.com/auron-project/datafusion.git", rev = "9034aeffb"}
205205
datafusion-physical-expr = { git = "https://github.com/auron-project/datafusion.git", rev = "9034aeffb"}
206206
datafusion-spark = { git = "https://github.com/auron-project/datafusion.git", rev = "9034aeffb"}
207-
orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git", rev = "17f7012"}
207+
orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git", rev = "9beb12c"}
208208

209209
# arrow: branch=v55.2.0-blaze
210210
arrow = { git = "https://github.com/auron-project/arrow-rs.git", rev = "5de02520c"}

NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22
Apache Auron (Incubating)
3-
Copyright 2025 The Apache Software Foundation.
3+
Copyright 2025-2026 The Apache Software Foundation.
44

55
This product includes software developed at
66
The Apache Software Foundation (https://www.apache.org/).

auron-flink-extension/auron-flink-runtime/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,19 @@
5858
<version>${flink.version}</version>
5959
<scope>provided</scope>
6060
</dependency>
61+
<dependency>
62+
<groupId>org.apache.flink</groupId>
63+
<artifactId>flink-streaming-java</artifactId>
64+
<version>${flink.version}</version>
65+
<scope>provided</scope>
66+
</dependency>
67+
<!-- Table API Java dependencies (not included in the uber) -->
68+
<dependency>
69+
<groupId>org.apache.flink</groupId>
70+
<artifactId>flink-table-api-java-bridge</artifactId>
71+
<version>${flink.version}</version>
72+
<scope>provided</scope>
73+
</dependency>
6174

6275
<!-- Test dependencies -->
6376
<dependency>
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.auron.flink.arrow;
18+
19+
import java.util.List;
20+
import org.apache.arrow.vector.BigIntVector;
21+
import org.apache.arrow.vector.BitVector;
22+
import org.apache.arrow.vector.DateDayVector;
23+
import org.apache.arrow.vector.DecimalVector;
24+
import org.apache.arrow.vector.FieldVector;
25+
import org.apache.arrow.vector.Float4Vector;
26+
import org.apache.arrow.vector.Float8Vector;
27+
import org.apache.arrow.vector.IntVector;
28+
import org.apache.arrow.vector.SmallIntVector;
29+
import org.apache.arrow.vector.TimeMicroVector;
30+
import org.apache.arrow.vector.TimeStampVector;
31+
import org.apache.arrow.vector.TinyIntVector;
32+
import org.apache.arrow.vector.VarBinaryVector;
33+
import org.apache.arrow.vector.VarCharVector;
34+
import org.apache.arrow.vector.VectorSchemaRoot;
35+
import org.apache.arrow.vector.complex.ListVector;
36+
import org.apache.arrow.vector.complex.MapVector;
37+
import org.apache.arrow.vector.complex.StructVector;
38+
import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector;
39+
import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector;
40+
import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector;
41+
import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector;
42+
import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector;
43+
import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector;
44+
import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector;
45+
import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector;
46+
import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector;
47+
import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector;
48+
import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector;
49+
import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector;
50+
import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
51+
import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
52+
import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
53+
import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
54+
import org.apache.auron.flink.table.data.AuronColumnarRowData;
55+
import org.apache.flink.table.data.RowData;
56+
import org.apache.flink.table.data.columnar.ColumnarRowData;
57+
import org.apache.flink.table.data.columnar.vector.ColumnVector;
58+
import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
59+
import org.apache.flink.table.types.logical.ArrayType;
60+
import org.apache.flink.table.types.logical.LogicalType;
61+
import org.apache.flink.table.types.logical.MapType;
62+
import org.apache.flink.table.types.logical.RowType;
63+
import org.apache.flink.util.Preconditions;
64+
65+
/**
66+
* Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}.
67+
*
68+
* <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by {@link
69+
* VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow {@link FieldVector} is
70+
* wrapped in a Flink {@link ColumnVector} implementation that delegates reads directly to the
71+
* underlying Arrow vector.
72+
*
73+
* <p>Object reuse: {@link #read(int)} returns the same {@link ColumnarRowData} instance with a
74+
* different row ID. Callers must consume or copy the returned row before the next call. This is
75+
* standard Flink practice for columnar readers.
76+
*
77+
* <p>Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT
78+
* close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible
79+
* for its lifecycle.
80+
*/
81+
public class FlinkArrowReader implements AutoCloseable {
82+
83+
private final RowType rowType;
84+
private ColumnVector[] columnVectors;
85+
private VectorizedColumnBatch batch;
86+
private AuronColumnarRowData reusableRow;
87+
private VectorSchemaRoot root;
88+
89+
private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType) {
90+
this(columnVectors, root, rowType, 0);
91+
}
92+
93+
private FlinkArrowReader(
94+
ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType, int dataColStartIndex) {
95+
this.columnVectors = columnVectors;
96+
this.batch = new VectorizedColumnBatch(columnVectors);
97+
this.reusableRow = new AuronColumnarRowData(batch);
98+
if (dataColStartIndex > 0) {
99+
this.reusableRow.setDataColStartIndex(dataColStartIndex);
100+
}
101+
this.root = root;
102+
this.rowType = rowType;
103+
}
104+
105+
/**
106+
* Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}.
107+
*
108+
* <p>The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching
109+
* types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector}
110+
* implementation based on the corresponding Flink {@link LogicalType}.
111+
*
112+
* @param root the Arrow VectorSchemaRoot containing the data
113+
* @param rowType the Flink RowType describing the schema
114+
* @return a new FlinkArrowReader
115+
* @throws IllegalArgumentException if field counts do not match
116+
* @throws UnsupportedOperationException if a LogicalType is not supported
117+
*/
118+
public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) {
119+
return create(root, rowType, 0);
120+
}
121+
122+
/**
123+
* Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}.
124+
*
125+
* <p>The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching
126+
* types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector}
127+
* implementation based on the corresponding Flink {@link LogicalType}.
128+
*
129+
* @param root the Arrow VectorSchemaRoot containing the data
130+
* @param rowType the Flink RowType describing the schema
131+
* @param dataColStartIndex the index of the first user data column
132+
* @return a new FlinkArrowReader
133+
* @throws IllegalArgumentException if field counts do not match
134+
* @throws UnsupportedOperationException if a LogicalType is not supported
135+
*/
136+
public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType, int dataColStartIndex) {
137+
Preconditions.checkNotNull(root, "root must not be null");
138+
Preconditions.checkNotNull(rowType, "rowType must not be null");
139+
List<FieldVector> fieldVectors = root.getFieldVectors();
140+
List<RowType.RowField> fields = rowType.getFields();
141+
if (fieldVectors.size() != fields.size()) {
142+
throw new IllegalArgumentException(
143+
"VectorSchemaRoot has " + fieldVectors.size() + " fields but RowType has " + fields.size());
144+
}
145+
ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
146+
for (int i = 0; i < fieldVectors.size(); i++) {
147+
columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType());
148+
}
149+
return new FlinkArrowReader(columns, root, rowType, dataColStartIndex);
150+
}
151+
152+
/**
153+
* Reads a row at the given position. Returns a reused {@link RowData} object — callers must not
154+
* hold references across calls.
155+
*
156+
* @param rowId the row index within the current batch
157+
* @return the row data at the given position
158+
*/
159+
public RowData read(int rowId) {
160+
reusableRow.setRowId(rowId);
161+
return reusableRow;
162+
}
163+
164+
/**
165+
* Returns the number of rows in the current batch.
166+
*
167+
* @return the row count
168+
*/
169+
public int getRowCount() {
170+
return root.getRowCount();
171+
}
172+
173+
/**
174+
* Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates
175+
* column vector wrappers for the new root's field vectors.
176+
*
177+
* <p>The new root must have the same schema (same number and types of fields) as the original.
178+
*
179+
* @param newRoot the new VectorSchemaRoot, must not be null
180+
*/
181+
public void reset(VectorSchemaRoot newRoot) {
182+
reset(newRoot, 0);
183+
}
184+
185+
/**
186+
* Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates
187+
* column vector wrappers for the new root's field vectors.
188+
*
189+
* <p>The new root must have the same schema (same number and types of fields) as the original.
190+
*
191+
* @param newRoot the new VectorSchemaRoot, must not be null
192+
* @param dataColStartIndex the index of the first user data column
193+
*/
194+
public void reset(VectorSchemaRoot newRoot, int dataColStartIndex) {
195+
Preconditions.checkNotNull(newRoot, "newRoot must not be null");
196+
this.root = newRoot;
197+
List<FieldVector> newVectors = newRoot.getFieldVectors();
198+
Preconditions.checkArgument(
199+
newVectors.size() == columnVectors.length,
200+
"New root has %s fields but reader expects %s",
201+
newVectors.size(),
202+
columnVectors.length);
203+
List<RowType.RowField> fields = rowType.getFields();
204+
for (int i = 0; i < columnVectors.length; i++) {
205+
columnVectors[i] =
206+
createColumnVector(newVectors.get(i), fields.get(i).getType());
207+
}
208+
this.batch = new VectorizedColumnBatch(columnVectors);
209+
this.reusableRow = new AuronColumnarRowData(batch);
210+
if (dataColStartIndex > 0) {
211+
this.reusableRow.setDataColStartIndex(dataColStartIndex);
212+
}
213+
}
214+
215+
/**
216+
* Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT
217+
* close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible
218+
* for its lifecycle.
219+
*/
220+
@Override
221+
public void close() {
222+
// Reader is a view; root lifecycle managed by caller.
223+
}
224+
225+
/**
226+
* Creates the appropriate Flink {@link ColumnVector} wrapper for the given Arrow {@link
227+
* FieldVector} and Flink {@link LogicalType}.
228+
*/
229+
static ColumnVector createColumnVector(FieldVector vector, LogicalType type) {
230+
switch (type.getTypeRoot()) {
231+
case BOOLEAN:
232+
return new ArrowBooleanColumnVector((BitVector) vector);
233+
case TINYINT:
234+
return new ArrowTinyIntColumnVector((TinyIntVector) vector);
235+
case SMALLINT:
236+
return new ArrowSmallIntColumnVector((SmallIntVector) vector);
237+
case INTEGER:
238+
return new ArrowIntColumnVector((IntVector) vector);
239+
case BIGINT:
240+
return new ArrowBigIntColumnVector((BigIntVector) vector);
241+
case FLOAT:
242+
return new ArrowFloatColumnVector((Float4Vector) vector);
243+
case DOUBLE:
244+
return new ArrowDoubleColumnVector((Float8Vector) vector);
245+
case VARCHAR:
246+
case CHAR:
247+
return new ArrowVarCharColumnVector((VarCharVector) vector);
248+
case VARBINARY:
249+
case BINARY:
250+
return new ArrowVarBinaryColumnVector((VarBinaryVector) vector);
251+
case DECIMAL:
252+
return new ArrowDecimalColumnVector((DecimalVector) vector);
253+
case DATE:
254+
return new ArrowDateColumnVector((DateDayVector) vector);
255+
case TIME_WITHOUT_TIME_ZONE:
256+
// The native engine (DataFusion) uses microsecond precision for all temporal
257+
// types, producing TimeMicroVector regardless of declared Flink TIME precision.
258+
return new ArrowTimeColumnVector((TimeMicroVector) vector);
259+
case TIMESTAMP_WITHOUT_TIME_ZONE:
260+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
261+
// The native engine (DataFusion) uses microsecond precision for all temporal
262+
// types. TimeStampVector is the common parent of TimeStampMicroVector and
263+
// TimeStampMicroTZVector.
264+
return new ArrowTimestampColumnVector((TimeStampVector) vector);
265+
case ARRAY:
266+
return createArrayColumnVector((ListVector) vector, (ArrayType) type);
267+
case MAP:
268+
return createMapColumnVector((MapVector) vector, (MapType) type);
269+
case ROW:
270+
return createRowColumnVector((StructVector) vector, (RowType) type);
271+
default:
272+
throw new UnsupportedOperationException(
273+
"Unsupported Flink type for Arrow reader: " + type.asSummaryString());
274+
}
275+
}
276+
277+
private static ColumnVector createArrayColumnVector(ListVector vector, ArrayType arrayType) {
278+
ColumnVector elementVector = createColumnVector(vector.getDataVector(), arrayType.getElementType());
279+
return new ArrowArrayColumnVector(vector, elementVector);
280+
}
281+
282+
private static ColumnVector createMapColumnVector(MapVector vector, MapType mapType) {
283+
StructVector entriesVector = (StructVector) vector.getDataVector();
284+
ColumnVector keyVector = createColumnVector(entriesVector.getChild(MapVector.KEY_NAME), mapType.getKeyType());
285+
ColumnVector valueVector =
286+
createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME), mapType.getValueType());
287+
return new ArrowMapColumnVector(vector, keyVector, valueVector);
288+
}
289+
290+
private static ColumnVector createRowColumnVector(StructVector vector, RowType rowType) {
291+
List<FieldVector> childVectors = vector.getChildrenFromFields();
292+
List<RowType.RowField> fields = rowType.getFields();
293+
ColumnVector[] childColumns = new ColumnVector[childVectors.size()];
294+
for (int i = 0; i < childVectors.size(); i++) {
295+
childColumns[i] =
296+
createColumnVector(childVectors.get(i), fields.get(i).getType());
297+
}
298+
return new ArrowRowColumnVector(vector, childColumns);
299+
}
300+
}

0 commit comments

Comments
 (0)