Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>chain-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -1187,12 +1193,6 @@
<td>String</td>
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -1235,6 +1235,12 @@
<td>Boolean</td>
<td>Whether to sort plan files by partition fields, this allows you to read according to the partition order, even if your partition writes are out of order.<br />It is recommended that you use this for streaming read of the 'append-only' table. By default, streaming read will read the full snapshot first. In order to avoid the disorder reading for partitions, you can open this option.</td>
</tr>
<tr>
<td><h5>scan.primary-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When a batch job queries from a table, if a partition exists in the primary branch, the reader will read this partition from the primary branch. Otherwise, the reader will read this partition from the current branch.</td>
</tr>
<tr>
<td><h5>scan.snapshot-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,4 @@
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
</tbody>
</table>
</table>
9 changes: 9 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,15 @@ public InlineElement getDescription() {
"When a batch job queries from a table, if a partition does not exist in the current branch, "
+ "the reader will try to get this partition from this fallback branch.");

public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
key("scan.primary-branch")
.stringType()
.noDefaultValue()
.withDescription(
"When a batch job queries from a table, if a partition exists in the primary branch, "
+ "the reader will read this partition from the primary branch. "
+ "Otherwise, the reader will read this partition from the current branch.");

public static final ConfigOption<Boolean> ASYNC_FILE_WRITE =
key("async-file-write")
.booleanType()
Expand Down
5 changes: 3 additions & 2 deletions paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
Expand Down Expand Up @@ -186,11 +187,11 @@ private static List<DataField> normalizeFields(

private List<String> normalizePrimaryKeys(List<String> primaryKeys) {
if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
if (!primaryKeys.isEmpty()) {
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
if (!primaryKeys.isEmpty() && !StringUtils.isEmpty(pk)) {
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If table options contain primary-key with an empty value (e.g. ''), the new condition !primaryKeys.isEmpty() && !StringUtils.isEmpty(pk) will skip the conflict check and then overwrite the DDL-defined primary key with an empty list (because pk.split(",") yields only empty tokens). That silently drops the primary key definition. It would be safer to treat null/blank pk as “option not set” (ignore it and keep the DDL primaryKeys) instead of normalizing it into an empty PK list.

Suggested change
if (!primaryKeys.isEmpty() && !StringUtils.isEmpty(pk)) {
// Treat null/blank primary-key option as "not set" and keep DDL-defined primary keys.
if (StringUtils.isEmpty(pk)) {
return primaryKeys;
}
if (!primaryKeys.isEmpty()) {

Copilot uses AI. Check for mistakes.
throw new RuntimeException(
"Cannot define primary key on DDL and table options at the same time.");
}
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
primaryKeys =
Arrays.stream(pk.split(","))
.map(String::trim)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,34 @@ public static void validateTableSchema(TableSchema schema) {

public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
String fallbackBranch = schema.options().get(CoreOptions.SCAN_FALLBACK_BRANCH.key());
String primaryBranch = schema.options().get(CoreOptions.SCAN_PRIMARY_BRANCH.key());

if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)
&& !StringUtils.isNullOrWhitespaceOnly(primaryBranch)) {
throw new IllegalArgumentException(
String.format(
"Cannot set both '%s' and '%s' at the same time.",
CoreOptions.SCAN_FALLBACK_BRANCH.key(),
CoreOptions.SCAN_PRIMARY_BRANCH.key()));
}

if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
checkArgument(
schemaManager.copyWithBranch(fallbackBranch).latest().isPresent(),
"Cannot set '%s' = '%s' because the branch '%s' isn't existed.",
"Cannot set '%s' = '%s' because the branch '%s' does not exist.",
CoreOptions.SCAN_FALLBACK_BRANCH.key(),
fallbackBranch,
fallbackBranch);
}

if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)) {
checkArgument(
schemaManager.copyWithBranch(primaryBranch).latest().isPresent(),
"Cannot set '%s' = '%s' because the branch '%s' does not exist.",
CoreOptions.SCAN_PRIMARY_BRANCH.key(),
primaryBranch,
primaryBranch);
}
}

private static void validateOnlyContainPrimitiveType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,19 @@ public void deleteBranch(String branchName) {
&& branchName.equals(fallbackBranch)) {
throw new IllegalArgumentException(
String.format(
"can not delete the fallback branch. "
+ "branchName to be deleted is %s. you have set 'scan.fallback-branch' = '%s'. "
+ "you should reset 'scan.fallback-branch' before deleting this branch.",
branchName, fallbackBranch));
"Cannot delete branch '%s' because it is configured as"
+ " 'scan.fallback-branch'. Unset 'scan.fallback-branch' first.",
branchName));
}

String primaryBranch = coreOptions().toConfiguration().get(CoreOptions.SCAN_PRIMARY_BRANCH);
if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)
&& branchName.equals(primaryBranch)) {
throw new IllegalArgumentException(
String.format(
Comment on lines +717 to +721
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new guard preventing deletion of the scan.primary-branch is not covered by tests (there are existing tests around branch deletion / fallback-branch). Please add a test asserting deleteBranch(primaryBranch) fails with the expected message when scan.primary-branch is set, similar to the existing fallback-branch coverage.

Copilot uses AI. Check for mistakes.
"Cannot delete branch '%s' because it is configured as"
+ " 'scan.primary-branch'. Unset 'scan.primary-branch' first.",
branchName));
}

branchManager().dropBranch(branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
public class ChainGroupReadTable extends FallbackReadFileStoreTable {

public ChainGroupReadTable(FileStoreTable snapshotStoreTable, FileStoreTable deltaStoreTable) {
super(snapshotStoreTable, deltaStoreTable);
super(snapshotStoreTable, deltaStoreTable, true);
checkArgument(snapshotStoreTable instanceof PrimaryKeyFileStoreTable);
checkArgument(deltaStoreTable instanceof PrimaryKeyFileStoreTable);
}
Expand All @@ -77,7 +77,7 @@ public DataTableScan newScan() {
super.validateSchema();
return new ChainTableBatchScan(
wrapped.newScan(),
fallback().newScan(),
other().newScan(),
((AbstractFileStoreTable) wrapped).tableSchema,
this);
}
Expand All @@ -87,42 +87,38 @@ private DataTableScan newSnapshotScan() {
}

private DataTableScan newDeltaScan() {
return fallback().newScan();
return other().newScan();
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new ChainGroupReadTable(
wrapped.copy(dynamicOptions),
fallback().copy(rewriteFallbackOptions(dynamicOptions)));
wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions)));
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new ChainGroupReadTable(
wrapped.copy(newTableSchema),
fallback()
.copy(
newTableSchema.copy(
rewriteFallbackOptions(newTableSchema.options()))));
other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))));
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new ChainGroupReadTable(
wrapped.copyWithoutTimeTravel(dynamicOptions),
fallback().copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)));
}

@Override
public FileStoreTable copyWithLatestSchema() {
return new ChainGroupReadTable(
wrapped.copyWithLatestSchema(), fallback().copyWithLatestSchema());
wrapped.copyWithLatestSchema(), other().copyWithLatestSchema());
}

@Override
public FileStoreTable switchToBranch(String branchName) {
return new ChainGroupReadTable(switchWrappedToBranch(branchName), fallback());
return new ChainGroupReadTable(switchWrappedToBranch(branchName), other());
}

/** Scan implementation for {@link ChainGroupReadTable}. */
Expand All @@ -146,7 +142,7 @@ public ChainTableBatchScan(
mainScan,
fallbackScan,
chainGroupReadTable.wrapped,
chainGroupReadTable.fallback(),
chainGroupReadTable.other(),
tableSchema);
this.options = CoreOptions.fromMap(tableSchema.options());
this.chainGroupReadTable = chainGroupReadTable;
Expand Down Expand Up @@ -500,7 +496,7 @@ private class Read implements InnerTableRead {

private Read() {
this.mainRead = wrapped.newRead();
this.fallbackRead = fallback().newRead();
this.fallbackRead = other().newRead();
}

@Override
Expand Down
Loading
Loading