PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
Conversation
|
|
||
| /** | ||
| * PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between | ||
| * hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side |
|
|
||
| public static Long getPhoenixSyncTableFromTime(Configuration conf) { | ||
| Preconditions.checkNotNull(conf); | ||
| String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME); |
There was a problem hiding this comment.
Why didn't you use conf.getLong() ?
| conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime); | ||
| } | ||
|
|
||
| public static Long getPhoenixSyncTableToTime(Configuration conf) { |
There was a problem hiding this comment.
Here also why didn't you use conf.getLong ?
| qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); | ||
| qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); | ||
| PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); | ||
| PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); |
There was a problem hiding this comment.
Do we need the end time to be within the max lookback window ? How will the sync tool break if the end time is outside of the max lookback window ?
There was a problem hiding this comment.
Right, this check is not useful.
There was a problem hiding this comment.
On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.
If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?
There was a problem hiding this comment.
I was thinking more about the "consistent window" or "quiesced window" approach that I suggested above and realized this is actually a race against sliding window during long-running jobs.
If a sync job takes several hours to complete, a startTime that was valid at the beginning of the job might actually 'slide' out of the lookback window by the time the final Mappers execute. Since HBase compactions on the Source and Target clusters aren't synchronized, couldn't this lead to false-positive mismatches if one cluster purges historical data mid-run while the other hasn't yet?
It may not always be possible to make the "Safety Buffer" on the startTime large enough to account for the job execution time, what if the max lookback window is only a few hours and the job itself takes hours? Does this require utilizing HBase Snapshots to 'freeze' the data state for the duration of the sync? Are there existing pattern that other systems might have employed to handle this issue?
There was a problem hiding this comment.
We need to think about this from two perspective, where we run the sync job regularly as cron, secondly if we use this for migration validation.
For migration validation, start time would definitely before maxLookbackAge. It is upto the owner if they want to validate all version and delete markers or just latest version.
For regular cron job to be used in PhoenixHA, we can configure the start/end time to be within maxLookBackAge.
Tanuj suggestion of giving user flexibility to choose rawScan & allVersion option would be helpful. And since we plan to fix the mismatched rows as well, we can consider source as SOT and fix accordingly.
Though, there can be instance where it can't be fixed like source have removed delete marker via compaction but target still has delete marker. Such rows can be flagged as not fixable as per design.
Btw, default endTime is (currentTime - 1 hour), to ensure target has the desired data.
| PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); | ||
| } | ||
| if (tenantId != null) { | ||
| PhoenixConfigurationUtil.setTenantId(configuration, tenantId); |
There was a problem hiding this comment.
Can you verify if the tenantid is being correctly set as a key prefix on the scan ?
There was a problem hiding this comment.
If you have a table region with multiple tenants and we pass a tenant id then our scan range should start with the tenantid prefix.
There was a problem hiding this comment.
Yes, it only create input ranges and scan for tenant specific rows. We have an IT for same
| * Configures a Configuration object with ZooKeeper settings from a ZK quorum string. | ||
| * @param baseConf Base configuration to create from (typically job configuration) | ||
| * @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" Example: | ||
| * "zk1,zk2,zk3:2181:/hbase" |
There was a problem hiding this comment.
This is actually not the only format for zk quorum. There are other valid formats also where the port number is specified separately for each server. There is actually a very useful API in Hbase called HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum) We should use that as that also works for zk registry.
|
|
||
| String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME | ||
| + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" | ||
| + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; |
There was a problem hiding this comment.
I am not 100% positive that you can assume that the output of this query is always sorted by row key. You might have to add an ORDER BY clause here. If you are adding an ORDER BY clause it will be better to add all the PK columns to make the sorting more efficient.
| int completedIdx = 0; | ||
|
|
||
| // Two pointer comparison across splitRange and completedRange | ||
| while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) { |
There was a problem hiding this comment.
I think you are assuming here that completedRegions is already sorted. Please see my comment on the getProcessedMapperRegions function.
There was a problem hiding this comment.
Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.
| PhoenixInputSplit split = (PhoenixInputSplit) allSplits.get(splitIdx); | ||
| KeyRange splitRange = split.getKeyRange(); | ||
| KeyRange completedRange = completedRegions.get(completedIdx); | ||
| byte[] splitStart = splitRange.getLowerRange(); |
There was a problem hiding this comment.
Will the end key of the split range will always be exclusive ? If yes, can you please add a comment
There was a problem hiding this comment.
Yes, both splitRange and completedRange, start key would be inclusive and endKey exclusive always. Will add a comment.
| * @return List of (startKey, endKey) pairs representing unprocessed ranges | ||
| */ | ||
| @VisibleForTesting | ||
| public List<Pair<byte[], byte[]>> calculateUnprocessedRanges(byte[] mapperRegionStart, |
There was a problem hiding this comment.
Maybe we could return a List<KeyRange>
| if (hasStartBoundary) { | ||
| queryBuilder.append(" AND END_ROW_KEY >= ?"); | ||
| } | ||
| queryBuilder.append(" AND STATUS IN (?, ?)"); |
There was a problem hiding this comment.
Same as above we don't need to pass status
| scan.setCacheBlocks(false); | ||
| scan.setTimeRange(fromTime, toTime); | ||
| if (isTargetScan) { | ||
| scan.setLimit(1); |
There was a problem hiding this comment.
Can you add a comment here why we are setting limit to 1 and caching to 1
| Scan scan = new Scan(); | ||
| scan.withStartRow(startKey, isStartKeyInclusive); | ||
| scan.withStopRow(endKey, isEndKeyInclusive); | ||
| scan.setRaw(true); |
There was a problem hiding this comment.
Are we sure we have to do raw scan ?
There was a problem hiding this comment.
Also, can we make this configurable via the SyncTool commandl ine
| scan.withStartRow(startKey, isStartKeyInclusive); | ||
| scan.withStopRow(endKey, isEndKeyInclusive); | ||
| scan.setRaw(true); | ||
| scan.readAllVersions(); |
There was a problem hiding this comment.
Same can we make the behavior of reading all versions configurable.
| @@ -0,0 +1,2267 @@ | |||
| /* | |||
There was a problem hiding this comment.
Can you add a test where rows are deleted on both the source and target tables but you have run compaction on only one. We can have actually 2 cases where compaction is run on the source but not on target and vice versa. I saw that you are doing raw scan. Maxlookback settings will also impact this.
haridsv
left a comment
There was a problem hiding this comment.
I just skimmed through and left some comments at the surface level.
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java
Show resolved
Hide resolved
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java
Outdated
Show resolved
Hide resolved
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java
Outdated
Show resolved
Hide resolved
...t/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
Show resolved
Hide resolved
phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
Outdated
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
Outdated
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java
Outdated
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java
Outdated
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
Outdated
Show resolved
Hide resolved
| qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); | ||
| qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); | ||
| PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); | ||
| PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); |
There was a problem hiding this comment.
On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.
If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
Outdated
Show resolved
Hide resolved
| try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { | ||
| ps.setString(1, row.getTableName()); | ||
| ps.setString(2, row.getTargetCluster()); | ||
| ps.setString(3, row.getType().name()); |
There was a problem hiding this comment.
I would recommend storing a byte code rather a long string to reduce the size of the row key.
There was a problem hiding this comment.
Have you thought about this?
There was a problem hiding this comment.
35MB calculation was for primary key. 100K region, and each region with 10 chunks will have total of 1.1M rows.
Each row size with CHUNK/REGION will be 35 bytes. This is for all columns/cells
Each row size with C/R will be 7 bytes
1.1M35 - 1.1M7 Bytes, that roughly equals to 35 MB for a table with 100k region.
There was a problem hiding this comment.
OK, I guess the difference will reduce further after compression, we should enable it along with column encoding as well.
There was a problem hiding this comment.
By default column encoding is set to 2 for user table.
I just checked, none of the Phoenix system tables specify COMPRESSION in their DDL, maybe it depens on. what compression they want to use.
So, COMPRESSION can be set explicitly as HBASE column family attribute.
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Outdated
Show resolved
Hide resolved
phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
Outdated
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Outdated
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Outdated
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
...e-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableCheckpointOutputRow.java
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
Outdated
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Show resolved
Hide resolved
...core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
Outdated
Show resolved
Hide resolved
| int completedIdx = 0; | ||
|
|
||
| // Two pointer comparison across splitRange and completedRange | ||
| while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) { |
There was a problem hiding this comment.
Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
Show resolved
Hide resolved
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
Show resolved
Hide resolved
|
I see a generics compiler warning that can be fixed with the following change: The existing code also has a couple of warnings that can be fixed: |
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java
Outdated
Show resolved
Hide resolved
...e-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableCheckpointOutputRow.java
Show resolved
Hide resolved
Fixed it for my changes. |
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java
Outdated
Show resolved
Hide resolved
...-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java
Outdated
Show resolved
Hide resolved
...-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java
Outdated
Show resolved
Hide resolved
| // Not using try-with-resources since ChunkScannerContext owns the table lifecycle | ||
| Table hTable = | ||
| conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName); | ||
| Scan scan = |
There was a problem hiding this comment.
@rahulLiving One thing I realized we are not setting ttl attribute on the scan. We should so that we can mask expired rows. Also, add a test case for the same.
No description provided.