Skip to content

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379

Open
rahulLiving wants to merge 26 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751
Open

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
rahulLiving wants to merge 26 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751

Conversation

@rahulLiving
Copy link
Contributor

No description provided.

@rahulLiving rahulLiving marked this pull request as ready for review March 12, 2026 12:36

/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo missing 'P'


public static Long getPhoenixSyncTableFromTime(Configuration conf) {
Preconditions.checkNotNull(conf);
String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME);
Copy link
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you use conf.getLong() ?

conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime);
}

public static Long getPhoenixSyncTableToTime(Configuration conf) {
Copy link
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also why didn't you use conf.getLong ?

return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these APIs can remain in PhoenixSyncTableTool class only. They are specific to Sync tool

return false;
}

buildChunkMetadataResult(results, isTargetScan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we break out early due to page timeout won't we have a partial chunk ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that isTargetScan is for different purpose or at-least the naming can be improved.

Comment on lines +81 to +84
private byte[] chunkStartKey = null;
private byte[] chunkEndKey = null;
private long currentChunkSize = 0L;
private long currentChunkRowCount = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement can be made here to introduce the notion of a chunk object

byte[] rowKey = CellUtil.cloneRow(rowCells.get(0));
long rowSize = calculateRowSize(rowCells);
addRowToChunk(rowKey, rowCells, rowSize);
if (!isTargetScan && willExceedChunkLimits(rowSize)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So addRowToChunk is already adding the rowSize to chunkSize and then willExceedChunkLimits is again adding rowSize to chunkSize

public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
region.startRegionOperation();
try {
resetChunkState();
Copy link
Contributor

@tkhurana tkhurana Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a notion of a chunk object then you don't need reset you can simply create a new chunk

/**
* PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum
*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these instead be named SYNC_TOOL ?

Comment on lines +168 to +172
if (chunkStartKey == null) {
LOGGER.warn("Paging timed out while fetching first row of chunk, initStartRowKey: {}",
Bytes.toStringBinary(initStartRowKey));
updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan);
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever hit ? Even with 0 page timeout we get at least one row

@Override
protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context)
throws IOException, InterruptedException {
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of INPUT_RECORDS in the context of sync tool ?


if (sourceRowsProcessed > 0) {
if (mismatchedChunk == 0) {
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the OUTPUT_RECORDS mean in the context of Sync tool ?

+ " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n"
+ " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n"
+ " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n"
+ " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Counters should have a fixed limit. Just make them VARCHAR so that we can add more counters in the future.


public enum Type {
CHUNK,
MAPPER_REGION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just REGION


String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME
+ " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?"
+ " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only 2 possible status so does it make sense to set them in the query ? If you don't then you are only querying pk columns without any filter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants