Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,16 @@ public class ConfigNodeConfig {
private long schemaRegionRatisInitialSleepTimeMs = 100;
private long schemaRegionRatisMaxSleepTimeMs = 10000;

/**
* RatisConsensus protocol, max retry attempts for a configuration change (add/remove peer). Uses
* a fixed 2s retry interval; bounding the attempts stops a killed ADDING peer from blocking the
* reconfiguration -- and hence a region migration -- forever.
*/
private int configNodeRatisReconfigurationMaxRetryAttempts = 600;

private int dataRegionRatisReconfigurationMaxRetryAttempts = 600;
private int schemaRegionRatisReconfigurationMaxRetryAttempts = 600;

private long configNodeRatisPreserveLogsWhenPurge = 1000;
private long schemaRegionRatisPreserveLogsWhenPurge = 1000;
private long dataRegionRatisPreserveLogsWhenPurge = 1000;
Expand Down Expand Up @@ -1117,6 +1127,36 @@ public void setSchemaRegionRatisMaxRetryAttempts(int schemaRegionRatisMaxRetryAt
this.schemaRegionRatisMaxRetryAttempts = schemaRegionRatisMaxRetryAttempts;
}

public int getConfigNodeRatisReconfigurationMaxRetryAttempts() {
return configNodeRatisReconfigurationMaxRetryAttempts;
}

public void setConfigNodeRatisReconfigurationMaxRetryAttempts(
int configNodeRatisReconfigurationMaxRetryAttempts) {
this.configNodeRatisReconfigurationMaxRetryAttempts =
configNodeRatisReconfigurationMaxRetryAttempts;
}

public int getDataRegionRatisReconfigurationMaxRetryAttempts() {
return dataRegionRatisReconfigurationMaxRetryAttempts;
}

public void setDataRegionRatisReconfigurationMaxRetryAttempts(
int dataRegionRatisReconfigurationMaxRetryAttempts) {
this.dataRegionRatisReconfigurationMaxRetryAttempts =
dataRegionRatisReconfigurationMaxRetryAttempts;
}

public int getSchemaRegionRatisReconfigurationMaxRetryAttempts() {
return schemaRegionRatisReconfigurationMaxRetryAttempts;
}

public void setSchemaRegionRatisReconfigurationMaxRetryAttempts(
int schemaRegionRatisReconfigurationMaxRetryAttempts) {
this.schemaRegionRatisReconfigurationMaxRetryAttempts =
schemaRegionRatisReconfigurationMaxRetryAttempts;
}

public long getSchemaRegionRatisInitialSleepTimeMs() {
return schemaRegionRatisInitialSleepTimeMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ private void loadRatisConsensusConfig(TrimProperties properties) {
properties.getProperty(
"config_node_ratis_max_retry_attempts",
String.valueOf(conf.getConfigNodeRatisMaxRetryAttempts()))));
conf.setConfigNodeRatisReconfigurationMaxRetryAttempts(
Integer.parseInt(
properties.getProperty(
"config_node_ratis_reconfiguration_max_retry_attempts",
String.valueOf(conf.getConfigNodeRatisReconfigurationMaxRetryAttempts()))));
conf.setConfigNodeRatisInitialSleepTimeMs(
Long.parseLong(
properties.getProperty(
Expand All @@ -645,6 +650,11 @@ private void loadRatisConsensusConfig(TrimProperties properties) {
properties.getProperty(
"data_region_ratis_max_retry_attempts",
String.valueOf(conf.getDataRegionRatisMaxRetryAttempts()))));
conf.setDataRegionRatisReconfigurationMaxRetryAttempts(
Integer.parseInt(
properties.getProperty(
"data_region_ratis_reconfiguration_max_retry_attempts",
String.valueOf(conf.getDataRegionRatisReconfigurationMaxRetryAttempts()))));
conf.setDataRegionRatisInitialSleepTimeMs(
Long.parseLong(
properties.getProperty(
Expand All @@ -661,6 +671,11 @@ private void loadRatisConsensusConfig(TrimProperties properties) {
properties.getProperty(
"schema_region_ratis_max_retry_attempts",
String.valueOf(conf.getSchemaRegionRatisMaxRetryAttempts()))));
conf.setSchemaRegionRatisReconfigurationMaxRetryAttempts(
Integer.parseInt(
properties.getProperty(
"schema_region_ratis_reconfiguration_max_retry_attempts",
String.valueOf(conf.getSchemaRegionRatisReconfigurationMaxRetryAttempts()))));
conf.setSchemaRegionRatisInitialSleepTimeMs(
Long.parseLong(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ private void setConsensusLayer(ConfigRegionStateMachine stateMachine) {
.setClientRetryMaxSleepTimeMs(
CONF.getConfigNodeRatisMaxSleepTimeMs())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.setReconfigurationMaxRetryAttempts(
CONF.getConfigNodeRatisReconfigurationMaxRetryAttempts())
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ private void setRatisConfig(ConfigurationResp dataSet) {
ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
ratisConfig.setDataReconfigurationMaxRetryAttempts(
conf.getDataRegionRatisReconfigurationMaxRetryAttempts());
ratisConfig.setSchemaReconfigurationMaxRetryAttempts(
conf.getSchemaRegionRatisReconfigurationMaxRetryAttempts());

ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,18 +865,21 @@ public static class Client {
private final long clientRetryInitialSleepTimeMs;
private final long clientRetryMaxSleepTimeMs;
private final int maxClientNumForEachNode;
private final int reconfigurationMaxRetryAttempts;

public Client(
long clientRequestTimeoutMillis,
int clientMaxRetryAttempt,
long clientRetryInitialSleepTimeMs,
long clientRetryMaxSleepTimeMs,
int maxClientNumForEachNode) {
int maxClientNumForEachNode,
int reconfigurationMaxRetryAttempts) {
this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
this.clientMaxRetryAttempt = clientMaxRetryAttempt;
this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
this.maxClientNumForEachNode = maxClientNumForEachNode;
this.reconfigurationMaxRetryAttempts = reconfigurationMaxRetryAttempts;
}

public long getClientRequestTimeoutMillis() {
Expand All @@ -899,6 +902,10 @@ public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}

public int getReconfigurationMaxRetryAttempts() {
return reconfigurationMaxRetryAttempts;
}

public static Client.Builder newBuilder() {
return new Builder();
}
Expand All @@ -910,14 +917,20 @@ public static class Builder {
private long clientRetryInitialSleepTimeMs = 100;
private long clientRetryMaxSleepTimeMs = 10000;
private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
// A Ratis configuration change (add/remove peer) retries the "in progress / not ready"
// failures with a fixed 2s interval. Bounding the number of attempts (instead of retrying
// forever) prevents a killed ADDING peer that can never catch up from blocking the
// reconfiguration -- and hence the region migration -- indefinitely. 600 attempts ~= 20min.
private int reconfigurationMaxRetryAttempts = 600;

public Client build() {
return new Client(
clientRequestTimeoutMillis,
clientMaxRetryAttempt,
clientRetryInitialSleepTimeMs,
clientRetryMaxSleepTimeMs,
maxClientNumForEachNode);
maxClientNumForEachNode,
reconfigurationMaxRetryAttempts);
}

public Builder setClientRequestTimeoutMillis(long clientRequestTimeoutMillis) {
Expand All @@ -944,6 +957,11 @@ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
}

public Builder setReconfigurationMaxRetryAttempts(int reconfigurationMaxRetryAttempts) {
this.reconfigurationMaxRetryAttempts = reconfigurationMaxRetryAttempts;
return this;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ public boolean validateObject(RaftGroup key, PooledObject<RatisClient> pooledObj
}
}

static class EndlessRetryFactory extends BaseClientFactory<RaftGroup, RatisClient> {
static class ReconfigurationRetryFactory extends BaseClientFactory<RaftGroup, RatisClient> {

private final RaftProperties raftProperties;
private final RaftClientRpc clientRpc;
private final RatisConfig.Client config;
private final Parameters parameters;

public EndlessRetryFactory(
public ReconfigurationRetryFactory(
ClientManager<RaftGroup, RatisClient> clientManager,
RaftProperties raftProperties,
RaftClientRpc clientRpc,
Expand All @@ -165,7 +165,7 @@ public PooledObject<RatisClient> makeObject(RaftGroup group) {
RaftClient.newBuilder()
.setProperties(raftProperties)
.setRaftGroup(group)
.setRetryPolicy(new RatisEndlessRetryPolicy(config))
.setRetryPolicy(new RatisReconfigurationRetryPolicy(config))
.setParameters(parameters)
.setClientRpc(clientRpc)
.build(),
Expand Down Expand Up @@ -226,16 +226,23 @@ public Action handleAttemptFailure(Event event) {
}

/** This policy is used to raft configuration change */
private static class RatisEndlessRetryPolicy implements RetryPolicy {

private static final Logger logger = LoggerFactory.getLogger(RatisEndlessRetryPolicy.class);
// for reconfiguration request, we use different retry policy
private final RetryPolicy endlessPolicy;
private static class RatisReconfigurationRetryPolicy implements RetryPolicy {

private static final Logger logger =
LoggerFactory.getLogger(RatisReconfigurationRetryPolicy.class);
// For a reconfiguration request we retry the "in progress / not ready" failures with a fixed
// 2s interval, but only up to a bounded number of attempts. An unbounded retry (the previous
// behavior) would block the setConfiguration call forever when a newly ADDING peer is killed
// and can never catch up, leaving the region migration permanently stuck. After the bound is
// exhausted the last failure is propagated, so the upper layer can fail and roll back.
private final RetryPolicy reconfigurationPolicy;
private final RetryPolicy defaultPolicy;

RatisEndlessRetryPolicy(RatisConfig.Client config) {
endlessPolicy =
RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(2, TimeUnit.SECONDS));
RatisReconfigurationRetryPolicy(RatisConfig.Client config) {
reconfigurationPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
config.getReconfigurationMaxRetryAttempts(),
TimeDuration.valueOf(2, TimeUnit.SECONDS));
defaultPolicy = new RatisRetryPolicy(config);
}

Expand All @@ -248,7 +255,7 @@ public Action handleAttemptFailure(Event event) {
|| cause instanceof LeaderSteppingDownException
|| cause instanceof ServerNotReadyException
|| cause instanceof NotLeaderException) {
return endlessPolicy.handleAttemptFailure(event);
return reconfigurationPolicy.handleAttemptFailure(event);
}

return defaultPolicy.handleAttemptFailure(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public GenericKeyedObjectPool<RaftGroup, RatisClient> createClientPool(
GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
new GenericKeyedObjectPool<>(
isReconfiguration
? new RatisClient.EndlessRetryFactory(
? new RatisClient.ReconfigurationRetryFactory(
manager, properties, clientRpc, config.getClient(), parameters)
: new RatisClient.Factory(
manager, properties, clientRpc, config.getClient(), parameters),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,15 @@ public class IoTDBConfig {

private int dataRatisConsensusMaxRetryAttempts = 10;
private int schemaRatisConsensusMaxRetryAttempts = 10;

/**
* RatisConsensus protocol, max retry attempts for a configuration change (add/remove peer). Uses
* a fixed 2s retry interval; bounding the attempts stops a killed ADDING peer from blocking the
* reconfiguration -- and hence a region migration -- forever. Pushed from the ConfigNode.
*/
private int dataRatisConsensusReconfigurationMaxRetryAttempts = 600;

private int schemaRatisConsensusReconfigurationMaxRetryAttempts = 600;
private long dataRatisConsensusInitialSleepTimeMs = 100L;
private long schemaRatisConsensusInitialSleepTimeMs = 100L;
private long dataRatisConsensusMaxSleepTimeMs = 10000L;
Expand Down Expand Up @@ -3832,6 +3841,26 @@ public void setSchemaRatisConsensusMaxRetryAttempts(int schemaRatisConsensusMaxR
this.schemaRatisConsensusMaxRetryAttempts = schemaRatisConsensusMaxRetryAttempts;
}

public int getDataRatisConsensusReconfigurationMaxRetryAttempts() {
return dataRatisConsensusReconfigurationMaxRetryAttempts;
}

public void setDataRatisConsensusReconfigurationMaxRetryAttempts(
int dataRatisConsensusReconfigurationMaxRetryAttempts) {
this.dataRatisConsensusReconfigurationMaxRetryAttempts =
dataRatisConsensusReconfigurationMaxRetryAttempts;
}

public int getSchemaRatisConsensusReconfigurationMaxRetryAttempts() {
return schemaRatisConsensusReconfigurationMaxRetryAttempts;
}

public void setSchemaRatisConsensusReconfigurationMaxRetryAttempts(
int schemaRatisConsensusReconfigurationMaxRetryAttempts) {
this.schemaRatisConsensusReconfigurationMaxRetryAttempts =
schemaRatisConsensusReconfigurationMaxRetryAttempts;
}

public long getDataRatisConsensusInitialSleepTimeMs() {
return dataRatisConsensusInitialSleepTimeMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2970,6 +2970,17 @@ public void loadRatisConfig(TRatisConfig ratisConfig) {
conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime());
conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime());

// Optional fields: an old ConfigNode (rolling upgrade) will not set them, in which case the
// DataNode keeps its local default instead of overwriting it with 0.
if (ratisConfig.isSetDataReconfigurationMaxRetryAttempts()) {
conf.setDataRatisConsensusReconfigurationMaxRetryAttempts(
ratisConfig.getDataReconfigurationMaxRetryAttempts());
}
if (ratisConfig.isSetSchemaReconfigurationMaxRetryAttempts()) {
conf.setSchemaRatisConsensusReconfigurationMaxRetryAttempts(
ratisConfig.getSchemaReconfigurationMaxRetryAttempts());
}

conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge());
conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ private static ConsensusConfig buildConsensusConfig() {
CONF.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(CONF.getDataRatisConsensusMaxSleepTimeMs())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.setReconfigurationMaxRetryAttempts(
CONF.getDataRatisConsensusReconfigurationMaxRetryAttempts())
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@
.setClientRetryMaxSleepTimeMs(
CONF.getDataRatisConsensusMaxSleepTimeMs())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.setReconfigurationMaxRetryAttempts(
CONF
.getSchemaRatisConsensusReconfigurationMaxRetryAttempts())

Check warning on line 160 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 104).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6vUF81wmy3pcsxseXO&open=AZ6vUF81wmy3pcsxseXO&pullRequest=17895
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,17 @@ data_region_ratis_max_retry_attempts=10
data_region_ratis_initial_sleep_time_ms=100
data_region_ratis_max_sleep_time_ms=10000

# Max retry attempts for a Ratis configuration change (add/remove peer), e.g. during region
# migration or cluster scale-in/out. Unlike the request retry policy above, reconfiguration retries
# use a fixed 2s interval, so this roughly caps the wait at (attempts * 2s). Bounding it (instead of
# retrying forever) prevents a killed ADDING peer that can never catch up from blocking the
# reconfiguration -- and therefore the whole region migration -- indefinitely.
# effectiveMode: restart
# Datatype: int
config_node_ratis_reconfiguration_max_retry_attempts=600
schema_region_ratis_reconfiguration_max_retry_attempts=600
data_region_ratis_reconfiguration_max_retry_attempts=600

# first election timeout
# effectiveMode: restart
# Datatype: int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ struct TRatisConfig {
34: required i64 dataRegionPeriodicSnapshotInterval

35: required i32 ratisTransferLeaderTimeoutMs;

// Bound the retry attempts of a Ratis configuration change (add/remove peer) so a killed ADDING
// peer cannot block the reconfiguration forever. Optional for rolling-upgrade compatibility: an
// old ConfigNode will not set them and the DataNode falls back to its local default.
36: optional i32 schemaReconfigurationMaxRetryAttempts
37: optional i32 dataReconfigurationMaxRetryAttempts
}

struct TCQConfig {
Expand Down
Loading