PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
Conversation
|
|
||
| /** | ||
| * PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between | ||
| * hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side |
|
|
||
| public static Long getPhoenixSyncTableFromTime(Configuration conf) { | ||
| Preconditions.checkNotNull(conf); | ||
| String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME); |
There was a problem hiding this comment.
Why didn't you use conf.getLong() ?
| conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime); | ||
| } | ||
|
|
||
| public static Long getPhoenixSyncTableToTime(Configuration conf) { |
There was a problem hiding this comment.
Here also why didn't you use conf.getLong ?
| return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, | ||
| DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER); | ||
| } | ||
|
|
There was a problem hiding this comment.
IMO these APIs can remain in PhoenixSyncTableTool class only. They are specific to Sync tool
| return false; | ||
| } | ||
|
|
||
| buildChunkMetadataResult(results, isTargetScan); |
There was a problem hiding this comment.
If we break out early due to page timeout won't we have a partial chunk ?
There was a problem hiding this comment.
It seems that isTargetScan is for different purpose or at-least the naming can be improved.
| private byte[] chunkStartKey = null; | ||
| private byte[] chunkEndKey = null; | ||
| private long currentChunkSize = 0L; | ||
| private long currentChunkRowCount = 0L; |
There was a problem hiding this comment.
Improvement can be made here to introduce the notion of a chunk object
| byte[] rowKey = CellUtil.cloneRow(rowCells.get(0)); | ||
| long rowSize = calculateRowSize(rowCells); | ||
| addRowToChunk(rowKey, rowCells, rowSize); | ||
| if (!isTargetScan && willExceedChunkLimits(rowSize)) { |
There was a problem hiding this comment.
So addRowToChunk is already adding the rowSize to chunkSize and then willExceedChunkLimits is again adding rowSize to chunkSize
| public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { | ||
| region.startRegionOperation(); | ||
| try { | ||
| resetChunkState(); |
There was a problem hiding this comment.
If you have a notion of a chunk object then you don't need reset you can simply create a new chunk
| /** | ||
| * PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum | ||
| */ | ||
| public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation"; |
There was a problem hiding this comment.
Should all of these instead be named SYNC_TOOL ?
| if (chunkStartKey == null) { | ||
| LOGGER.warn("Paging timed out while fetching first row of chunk, initStartRowKey: {}", | ||
| Bytes.toStringBinary(initStartRowKey)); | ||
| updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan); | ||
| return true; |
There was a problem hiding this comment.
Is this ever hit ? Even with 0 page timeout we get at least one row
| @Override | ||
| protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context) | ||
| throws IOException, InterruptedException { | ||
| context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); |
There was a problem hiding this comment.
What is the meaning of INPUT_RECORDS in the context of sync tool ?
|
|
||
| if (sourceRowsProcessed > 0) { | ||
| if (mismatchedChunk == 0) { | ||
| context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); |
There was a problem hiding this comment.
What does the OUTPUT_RECORDS mean in the context of Sync tool ?
| + " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n" | ||
| + " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n" | ||
| + " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n" | ||
| + " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n" |
There was a problem hiding this comment.
I don't think Counters should have a fixed limit. Just make them VARCHAR so that we can add more counters in the future.
|
|
||
| public enum Type { | ||
| CHUNK, | ||
| MAPPER_REGION |
|
|
||
| String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME | ||
| + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" | ||
| + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; |
There was a problem hiding this comment.
There are only 2 possible status so does it make sense to set them in the query ? If you don't then you are only querying pk columns without any filter.
No description provided.