Skip to content

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379

Open
rahulLiving wants to merge 35 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751
Open

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
rahulLiving wants to merge 35 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751

Conversation

@rahulLiving
Copy link
Copy Markdown
Contributor

No description provided.

@rahulLiving rahulLiving marked this pull request as ready for review March 12, 2026 12:36

/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo missing 'P'


public static Long getPhoenixSyncTableFromTime(Configuration conf) {
Preconditions.checkNotNull(conf);
String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME);
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you use conf.getLong() ?

conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime);
}

public static Long getPhoenixSyncTableToTime(Configuration conf) {
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also why didn't you use conf.getLong ?

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the end time to be within the max lookback window ? How will the sync tool break if the end time is outside of the max lookback window ?

Copy link
Copy Markdown
Contributor Author

@rahulLiving rahulLiving Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this check is not useful.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.

If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more about the "consistent window" or "quiesced window" approach that I suggested above and realized this is actually a race against sliding window during long-running jobs.

If a sync job takes several hours to complete, a startTime that was valid at the beginning of the job might actually 'slide' out of the lookback window by the time the final Mappers execute. Since HBase compactions on the Source and Target clusters aren't synchronized, couldn't this lead to false-positive mismatches if one cluster purges historical data mid-run while the other hasn't yet?

It may not always be possible to make the "Safety Buffer" on the startTime large enough to account for the job execution time, what if the max lookback window is only a few hours and the job itself takes hours? Does this require utilizing HBase Snapshots to 'freeze' the data state for the duration of the sync? Are there existing pattern that other systems might have employed to handle this issue?

@kadirozde @tkhurana

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about this from two perspective, where we run the sync job regularly as cron, secondly if we use this for migration validation.
For migration validation, start time would definitely before maxLookbackAge. It is upto the owner if they want to validate all version and delete markers or just latest version.
For regular cron job to be used in PhoenixHA, we can configure the start/end time to be within maxLookBackAge.
Tanuj suggestion of giving user flexibility to choose rawScan & allVersion option would be helpful. And since we plan to fix the mismatched rows as well, we can consider source as SOT and fix accordingly.
Though, there can be instance where it can't be fixed like source have removed delete marker via compaction but target still has delete marker. Such rows can be flagged as not fixable as per design.

Btw, default endTime is (currentTime - 1 hour), to ensure target has the desired data.

PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
}
if (tenantId != null) {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify if the tenantid is being correctly set as a key prefix on the scan ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a table region with multiple tenants and we pass a tenant id then our scan range should start with the tenantid prefix.

Copy link
Copy Markdown
Contributor Author

@rahulLiving rahulLiving Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it only create input ranges and scan for tenant specific rows. We have an IT for same

* Configures a Configuration object with ZooKeeper settings from a ZK quorum string.
* @param baseConf Base configuration to create from (typically job configuration)
* @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" Example:
* "zk1,zk2,zk3:2181:/hbase"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not the only format for zk quorum. There are other valid formats also where the port number is specified separately for each server. There is actually a very useful API in Hbase called HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum) We should use that as that also works for zk registry.


String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME
+ " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?"
+ " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% positive that you can assume that the output of this query is always sorted by row key. You might have to add an ORDER BY clause here. If you are adding an ORDER BY clause it will be better to add all the PK columns to make the sorting more efficient.

int completedIdx = 0;

// Two pointer comparison across splitRange and completedRange
while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are assuming here that completedRegions is already sorted. Please see my comment on the getProcessedMapperRegions function.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.

PhoenixInputSplit split = (PhoenixInputSplit) allSplits.get(splitIdx);
KeyRange splitRange = split.getKeyRange();
KeyRange completedRange = completedRegions.get(completedIdx);
byte[] splitStart = splitRange.getLowerRange();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the end key of the split range will always be exclusive ? If yes, can you please add a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both splitRange and completedRange, start key would be inclusive and endKey exclusive always. Will add a comment.

* @return List of (startKey, endKey) pairs representing unprocessed ranges
*/
@VisibleForTesting
public List<Pair<byte[], byte[]>> calculateUnprocessedRanges(byte[] mapperRegionStart,
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could return a List<KeyRange>

if (hasStartBoundary) {
queryBuilder.append(" AND END_ROW_KEY >= ?");
}
queryBuilder.append(" AND STATUS IN (?, ?)");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above we don't need to pass status

scan.setCacheBlocks(false);
scan.setTimeRange(fromTime, toTime);
if (isTargetScan) {
scan.setLimit(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here why we are setting limit to 1 and caching to 1

Scan scan = new Scan();
scan.withStartRow(startKey, isStartKeyInclusive);
scan.withStopRow(endKey, isEndKeyInclusive);
scan.setRaw(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we have to do raw scan ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we make this configurable via the SyncTool commandl ine

scan.withStartRow(startKey, isStartKeyInclusive);
scan.withStopRow(endKey, isEndKeyInclusive);
scan.setRaw(true);
scan.readAllVersions();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same can we make the behavior of reading all versions configurable.

@@ -0,0 +1,2267 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where rows are deleted on both the source and target tables but you have run compaction on only one. We can have actually 2 cases where compaction is run on the source but not on target and vice versa. I saw that you are doing raw scan. Maxlookback settings will also impact this.

Copy link
Copy Markdown
Contributor

@haridsv haridsv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just skimmed through and left some comments at the surface level.

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.

If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?

try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
ps.setString(1, row.getTableName());
ps.setString(2, row.getTargetCluster());
ps.setString(3, row.getType().name());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend storing a byte code rather a long string to reduce the size of the row key.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you thought about this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

35MB calculation was for primary key. 100K region, and each region with 10 chunks will have total of 1.1M rows.
Each row size with CHUNK/REGION will be 35 bytes. This is for all columns/cells
Each row size with C/R will be 7 bytes
1.1M35 - 1.1M7 Bytes, that roughly equals to 35 MB for a table with 100k region.

Copy link
Copy Markdown
Contributor

@haridsv haridsv Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I guess the difference will reduce further after compression, we should enable it along with column encoding as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default column encoding is set to 2 for user table.
I just checked, none of the Phoenix system tables specify COMPRESSION in their DDL, maybe it depens on. what compression they want to use.
So, COMPRESSION can be set explicitly as HBASE column family attribute.

int completedIdx = 0;

// Two pointer comparison across splitRange and completedRange
while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.

@haridsv
Copy link
Copy Markdown
Contributor

haridsv commented Mar 31, 2026

I see a generics compiler warning that can be fixed with the following change:

-public class PhoenixSyncTableInputFormat extends PhoenixInputFormat {
+public class PhoenixSyncTableInputFormat extends PhoenixInputFormat<DBWritable> {

The existing code also has a couple of warnings that can be fixed:

-public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat {
+public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat<T> {
-  extends PhoenixServerBuildIndexInputFormat {
+  extends PhoenixServerBuildIndexInputFormat<T> {

@rahulLiving
Copy link
Copy Markdown
Contributor Author

I see a generics compiler warning that can be fixed with the following change:

Fixed it for my changes.

// Not using try-with-resources since ChunkScannerContext owns the table lifecycle
Table hTable =
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName);
Scan scan =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahulLiving One thing I realized we are not setting ttl attribute on the scan. We should so that we can mask expired rows. Also, add a test case for the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants