Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* | +--------------------------------------+ |
* | | PER-CELL DATA (repeated) | |
* | | +––––––––––––----------------–--–+ | |
* | | | CELL TIMESTAMP (long) | | |
* | | | COLUMN QUALIFIER LENGTH (vint) | | |
* | | | COLUMN QUALIFIER (byte[]) | | |
* | | | VALUE LENGTH (vint) | | |
Expand Down Expand Up @@ -139,6 +140,7 @@ public void write(LogFile.Record record) throws IOException {
List<Cell> cells = entry.getValue();
WritableUtils.writeVInt(recordOut, cells.size());
for (Cell cell : cells) {
recordOut.writeLong(cell.getTimestamp());
WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
Expand Down Expand Up @@ -225,6 +227,8 @@ public boolean advance() throws IOException {
// Qualifiers+Values Count
int columnValuePairsCount = WritableUtils.readVInt(in);
for (int j = 0; j < columnValuePairsCount; j++) {
// Cell timestamp
long cellTs = in.readLong();
// Qualifier name
int qualLen = WritableUtils.readVInt(in);
byte[] qual = new byte[qualLen];
Expand All @@ -239,17 +243,17 @@ public boolean advance() throws IOException {
}
switch (type) {
case PUT:
((Put) mutation).addColumn(cf, qual, ts, value);
((Put) mutation).addColumn(cf, qual, cellTs, value);
break;
case DELETE:
case DELETECOLUMN:
((Delete) mutation).addColumn(cf, qual, ts);
((Delete) mutation).addColumn(cf, qual, cellTs);
break;
case DELETEFAMILYVERSION:
((Delete) mutation).addFamilyVersion(cf, ts);
((Delete) mutation).addFamilyVersion(cf, cellTs);
break;
case DELETEFAMILY:
((Delete) mutation).addFamily(cf);
((Delete) mutation).addFamily(cf, cellTs);
break;
default:
throw new UnsupportedOperationException("Unhandled mutation type " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ public void testCreateCommandNewHAGroup() throws Exception {
int ret = ToolRunner.run(adminTool,
new String[] { "create", "-g", createHaGroupName, "-p", "FAILOVER", "-zk1",
CLUSTERS.getZkUrl1(), "-c1", CLUSTERS.getMasterAddress1(), "-cr1", "ACTIVE", "-zk2",
CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2", "STANDBY", "-hdfs1",
CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2", "STANDBY", "-hdfs1",
CLUSTERS.getHdfsUrl1(), "-hdfs2", CLUSTERS.getHdfsUrl2() });

assertEquals("create command should succeed", RET_SUCCESS, ret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,95 @@ public void testCodecWithLargeValue() throws IOException {
new LogFileRecord().setHBaseTableName("TBLLVAL").setCommitId(1L).setMutation(put));
}

// Cell timestamp preservation tests
// These verify that per-cell timestamps survive a codec round-trip when they differ from the
// mutation-level timestamp. Before the fix the encoder omitted cell.getTimestamp() entirely
// and the decoder fell back to the mutation-level timestamp (or HConstants.LATEST_TIMESTAMP
// for addFamily), so any divergence produced wrong timestamps on the standby cluster.

@Test
public void testPutCellTimestampsDifferFromMutationTimestamp() throws IOException {
long mutationTs = 99999L;
Put put = new Put(Bytes.toBytes("row"));
put.setTimestamp(mutationTs);
// Each cell gets its own timestamp, all different from mutationTs and from each other
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), 11111L, Bytes.toBytes("v1"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q2"), 22222L, Bytes.toBytes("v2"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q3"), 33333L, Bytes.toBytes("v3"));
singleRecordTest(
new LogFileRecord().setHBaseTableName("TBLPUTTS").setCommitId(1L).setMutation(put));
}

@Test
public void testDeleteColumnCellTimestampDiffersFromMutationTimestamp() throws IOException {
long mutationTs = 99999L;
long cellTs = 11111L;
Delete delete = new Delete(Bytes.toBytes("row"));
delete.setTimestamp(mutationTs);
delete.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q"), cellTs);
singleRecordTest(
new LogFileRecord().setHBaseTableName("TBLDELCOLTS").setCommitId(1L).setMutation(delete));
}

@Test
public void testDeleteFamilyCellTimestampDiffersFromMutationTimestamp() throws IOException {
// This is the most direct regression test for the bug: addFamily(cf) was called without ts,
// defaulting to HConstants.LATEST_TIMESTAMP instead of preserving the original cell timestamp.
long mutationTs = 99999L;
long cellTs = 11111L;
Delete delete = new Delete(Bytes.toBytes("row"));
delete.setTimestamp(mutationTs);
delete.addFamily(Bytes.toBytes("cf"), cellTs); // explicit cell ts != mutationTs
singleRecordTest(
new LogFileRecord().setHBaseTableName("TBLDELFAMTS").setCommitId(1L).setMutation(delete));
}

@Test
public void testDeleteFamilyVersionCellTimestampDiffersFromMutationTimestamp()
throws IOException {
long mutationTs = 99999L;
long cellTs = 11111L;
Delete delete = new Delete(Bytes.toBytes("row"));
delete.setTimestamp(mutationTs);
delete.addFamilyVersion(Bytes.toBytes("cf"), cellTs); // explicit cell ts != mutationTs
singleRecordTest(
new LogFileRecord().setHBaseTableName("TBLDELFAMVERTS").setCommitId(1L).setMutation(delete));
}

@Test
public void testMultipleCellsWithDistinctTimestampsPreserved() throws IOException {
// Multiple cells in the same mutation each carry a unique timestamp; all must survive
// the round-trip intact.
long mutationTs = 50000L;
Put put = new Put(Bytes.toBytes("row"));
put.setTimestamp(mutationTs);
for (int i = 0; i < 10; i++) {
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q" + i), (long) (i * 1000),
Bytes.toBytes("v" + i));
}
LogFileCodec codec = new LogFileCodec();
LogFile.Record original =
new LogFileRecord().setHBaseTableName("TBLDISTINCT").setCommitId(1L).setMutation(put);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
codec.getEncoder(new DataOutputStream(baos)).write(original);
LogFile.Codec.Decoder decoder =
codec.getDecoder(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));

assertTrue(decoder.advance());
LogFile.Record decoded = decoder.current();
LogFileTestUtil.assertRecordEquals("All per-cell timestamps must be preserved", original,
decoded);
// Also verify each cell timestamp explicitly
java.util.List<org.apache.hadoop.hbase.Cell> originalCells =
put.getFamilyCellMap().get(Bytes.toBytes("cf"));
java.util.List<org.apache.hadoop.hbase.Cell> decodedCells =
decoded.getMutation().getFamilyCellMap().get(Bytes.toBytes("cf"));
assertEquals("Cell count must match", originalCells.size(), decodedCells.size());
for (int i = 0; i < originalCells.size(); i++) {
assertEquals("Cell " + i + " timestamp must be preserved",
originalCells.get(i).getTimestamp(), decodedCells.get(i).getTimestamp());
}
}

}