Skip to content

Commit 004268a

Browse files
authored
[FLINK-38968][table] Add toCatalogTable method to CatalogMaterializedTable interface
1 parent 754254f commit 004268a

3 files changed

Lines changed: 52 additions & 10 deletions

File tree

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,45 @@ void testInvalidDistributionBucketCount() {
354354
+ "distributed table must be at least 1.");
355355
}
356356

357+
@Test
358+
void testCatalogMaterializedTableToCatalogTable() {
359+
final CatalogMaterializedTable materializedTable = catalogMaterializedTable();
360+
361+
final CatalogTable catalogTable = materializedTable.toCatalogTable();
362+
363+
// Verify the conversion preserves properties
364+
assertThat(catalogTable.getUnresolvedSchema())
365+
.isEqualTo(materializedTable.getUnresolvedSchema());
366+
assertThat(catalogTable.getComment()).isEqualTo(materializedTable.getComment());
367+
assertThat(catalogTable.getPartitionKeys()).isEqualTo(materializedTable.getPartitionKeys());
368+
assertThat(catalogTable.getOptions()).isEqualTo(materializedTable.getOptions());
369+
assertThat(catalogTable.getDistribution()).isEqualTo(materializedTable.getDistribution());
370+
assertThat(catalogTable.getSnapshot()).isEqualTo(materializedTable.getSnapshot());
371+
}
372+
373+
@Test
374+
void testResolvedCatalogMaterializedTableToResolvedCatalogTable() {
375+
final CatalogMaterializedTable materializedTable = catalogMaterializedTable();
376+
377+
final ResolvedCatalogMaterializedTable resolvedMaterializedTable =
378+
resolveCatalogBaseTable(ResolvedCatalogMaterializedTable.class, materializedTable);
379+
380+
final ResolvedCatalogTable resolvedCatalogTable =
381+
resolvedMaterializedTable.toResolvedCatalogTable();
382+
383+
// Verify schema is preserved
384+
assertThat(resolvedCatalogTable.getResolvedSchema())
385+
.isEqualTo(resolvedMaterializedTable.getResolvedSchema());
386+
387+
// Verify origin properties are preserved
388+
assertThat(resolvedCatalogTable.getComment())
389+
.isEqualTo(resolvedMaterializedTable.getComment());
390+
assertThat(resolvedCatalogTable.getPartitionKeys())
391+
.isEqualTo(resolvedMaterializedTable.getPartitionKeys());
392+
assertThat(resolvedCatalogTable.getOptions())
393+
.isEqualTo(resolvedMaterializedTable.getOptions());
394+
}
395+
357396
// --------------------------------------------------------------------------------------------
358397
// Utilities
359398
// --------------------------------------------------------------------------------------------

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ default String getDefinitionQuery() {
180180
@Nullable
181181
byte[] getSerializedRefreshHandler();
182182

183+
/** Convert this object to a {@link CatalogTable} object for planner optimize query. */
184+
default CatalogTable toCatalogTable() {
185+
return CatalogTable.newBuilder()
186+
.schema(getUnresolvedSchema())
187+
.comment(getComment())
188+
.distribution(getDistribution().orElse(null))
189+
.partitionKeys(getPartitionKeys())
190+
.options(getOptions())
191+
.snapshot(getSnapshot().orElse(null))
192+
.build();
193+
}
194+
183195
/** The logical refresh mode of materialized table. */
184196
@PublicEvolving
185197
enum LogicalRefreshMode {

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,15 +205,6 @@ public String toString() {
205205

206206
/** Convert this object to a {@link ResolvedCatalogTable} object for planner optimize query. */
207207
public ResolvedCatalogTable toResolvedCatalogTable() {
208-
return new ResolvedCatalogTable(
209-
CatalogTable.newBuilder()
210-
.schema(getUnresolvedSchema())
211-
.comment(getComment())
212-
.distribution(getDistribution().orElse(null))
213-
.partitionKeys(getPartitionKeys())
214-
.options(getOptions())
215-
.snapshot(getSnapshot().orElse(null))
216-
.build(),
217-
getResolvedSchema());
208+
return new ResolvedCatalogTable(origin.toCatalogTable(), getResolvedSchema());
218209
}
219210
}

0 commit comments

Comments
 (0)