Skip to content

Commit d510e4e

Browse files
raminqaftwalthr
authored andcommitted
[FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with unmapped operation codes
This closes #27973. (cherry picked from commit b12302d)
1 parent 8766296 commit d510e4e

5 files changed

Lines changed: 65 additions & 45 deletions

File tree

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ SELECT * FROM FROM_CHANGELOG(
6161
| Parameter | Required | Description |
6262
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
6363
| `input` | Yes | The input table. Must be append-only. |
64-
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
65-
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |
64+
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. The column may be declared nullable, but a NULL value at runtime fails the job with a `TableRuntimeException` — every changelog row must carry an operation code. |
65+
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving an op code not present in the mapping fails the job at runtime with a `TableRuntimeException`. Each change operation may appear at most once across all entries. |
6666

6767
#### Default op_mapping
6868

@@ -75,6 +75,8 @@ When `op_mapping` is omitted, the following standard names are used. They allow
7575
| `'UPDATE_AFTER'` | UPDATE_AFTER |
7676
| `'DELETE'` | DELETE |
7777

78+
Any input row whose op code is not present in the active mapping (default or user-defined) fails the job at runtime with a `TableRuntimeException`.
79+
7880
### Output Schema
7981

8082
The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
@@ -90,6 +92,7 @@ The output contains all input columns except the operation code (e.g., op) colum
9092
```sql
9193
-- Input (append-only):
9294
-- +I[id:1, op:'INSERT', name:'Alice']
95+
-- +I[id:2, op:'INSERT', name:'Bob']
9396
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
9497
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
9598
-- +I[id:2, op:'DELETE', name:'Bob']
@@ -100,6 +103,7 @@ SELECT * FROM FROM_CHANGELOG(
100103

101104
-- Output (updating table):
102105
-- +I[id:1, name:'Alice']
106+
-- +I[id:2, name:'Bob']
103107
-- -U[id:1, name:'Alice']
104108
-- +U[id:1, name:'Alice2']
105109
-- -D[id:2, name:'Bob']

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,6 @@ protected Stream<TestSpec> testData() {
6161
"d", "DELETE"))
6262
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
6363

64-
// Valid: retract-style mapping with UPDATE_BEFORE
65-
TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
66-
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
67-
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
68-
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
69-
.calledWithLiteralAt(
70-
2,
71-
Map.of(
72-
"c", "INSERT",
73-
"ub", "UPDATE_BEFORE",
74-
"ua", "UPDATE_AFTER",
75-
"d", "DELETE"))
76-
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
77-
7864
// Error: op column not found
7965
TestSpec.forStrategy(
8066
"Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ public List<TableTestProgram> programs() {
4141
return List.of(
4242
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
4343
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
44-
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
4544
FromChangelogTestPrograms.CUSTOM_OP_NAME,
4645
FromChangelogTestPrograms.TABLE_API_DEFAULT,
47-
FromChangelogTestPrograms.ROUND_TRIP);
46+
FromChangelogTestPrograms.ROUND_TRIP,
47+
FromChangelogTestPrograms.INVALID_OP_CODE,
48+
FromChangelogTestPrograms.NULL_OP_CODE);
4849
}
4950
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.stream;
2020

21+
import org.apache.flink.table.api.TableRuntimeException;
2122
import org.apache.flink.table.connector.ChangelogMode;
2223
import org.apache.flink.table.test.program.SinkTestStep;
2324
import org.apache.flink.table.test.program.SourceTestStep;
@@ -93,32 +94,6 @@ public class FromChangelogTestPrograms {
9394
+ "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
9495
.build();
9596

96-
public static final TableTestProgram UNMAPPED_CODES_DROPPED =
97-
TableTestProgram.of(
98-
"from-changelog-unmapped-codes-dropped",
99-
"unmapped op codes are silently dropped")
100-
.setupTableSource(
101-
SourceTestStep.newBuilder("cdc_stream")
102-
.addSchema(SIMPLE_CDC_SCHEMA)
103-
.producedValues(
104-
Row.of(1, "INSERT", "Alice"),
105-
Row.of(2, "INSERT", "Bob"),
106-
Row.of(1, "UNKNOWN", "Alice2"),
107-
Row.of(2, "DELETE", "Bob"))
108-
.build())
109-
.setupTableSink(
110-
SinkTestStep.newBuilder("sink")
111-
.addSchema("id INT", "name STRING")
112-
.consumedValues(
113-
Row.ofKind(RowKind.INSERT, 1, "Alice"),
114-
Row.ofKind(RowKind.INSERT, 2, "Bob"),
115-
Row.ofKind(RowKind.DELETE, 2, "Bob"))
116-
.build())
117-
.runSql(
118-
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
119-
+ "input => TABLE cdc_stream)")
120-
.build();
121-
12297
/** Custom op column name via DESCRIPTOR. */
12398
public static final TableTestProgram CUSTOM_OP_NAME =
12499
TableTestProgram.of(
@@ -207,4 +182,50 @@ public class FromChangelogTestPrograms {
207182
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
208183
+ "input => TABLE changelog_view)")
209184
.build();
185+
186+
// --------------------------------------------------------------------------------------------
187+
// Error validation tests
188+
// --------------------------------------------------------------------------------------------
189+
190+
public static final TableTestProgram INVALID_OP_CODE =
191+
TableTestProgram.of(
192+
"from-changelog-invalid-op-code",
193+
"fails when input contains an op code not in the mapping")
194+
.setupTableSource(
195+
SourceTestStep.newBuilder("cdc_stream")
196+
.addSchema(SIMPLE_CDC_SCHEMA)
197+
.producedValues(Row.of(1, "UNKNOWN", "Alice"))
198+
.build())
199+
.setupTableSink(
200+
SinkTestStep.newBuilder("sink")
201+
.addSchema("id INT", "name STRING")
202+
.consumedValues(new Row[0])
203+
.build())
204+
.runFailingSql(
205+
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
206+
+ "input => TABLE cdc_stream)",
207+
TableRuntimeException.class,
208+
"Received invalid op code 'UNKNOWN'")
209+
.build();
210+
211+
public static final TableTestProgram NULL_OP_CODE =
212+
TableTestProgram.of(
213+
"from-changelog-null-op-code",
214+
"fails when input contains a NULL op code")
215+
.setupTableSource(
216+
SourceTestStep.newBuilder("cdc_stream")
217+
.addSchema(SIMPLE_CDC_SCHEMA)
218+
.producedValues(Row.of(1, null, "Alice"))
219+
.build())
220+
.setupTableSink(
221+
SinkTestStep.newBuilder("sink")
222+
.addSchema("id INT", "name STRING")
223+
.consumedValues(new Row[0])
224+
.build())
225+
.runFailingSql(
226+
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
227+
+ "input => TABLE cdc_stream)",
228+
TableRuntimeException.class,
229+
"Received NULL op code")
230+
.build();
210231
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.runtime.functions.ptf;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.TableRuntimeException;
2223
import org.apache.flink.table.data.MapData;
2324
import org.apache.flink.table.data.RowData;
2425
import org.apache.flink.table.data.StringData;
@@ -134,10 +135,17 @@ public void eval(
134135
final RowData input,
135136
@Nullable final ColumnList op,
136137
@Nullable final MapData opMapping) {
138+
if (input.isNullAt(opColumnIndex)) {
139+
throw new TableRuntimeException(
140+
"Received NULL op code. Every changelog row must carry an operation code.");
141+
}
137142
final StringData opCode = input.getString(opColumnIndex);
138143
final RowKind rowKind = opMap.get(opCode);
139144
if (rowKind == null) {
140-
return;
145+
throw new TableRuntimeException(
146+
String.format(
147+
"Received invalid op code '%s'. Defined op codes are: %s.",
148+
opCode, opMap.keySet()));
141149
}
142150

143151
projectedOutput.replaceRow(input);

0 commit comments

Comments
 (0)