Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7400,10 +7400,6 @@ Throw an error if there are pending mutations when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"(
Throw an error if there are pending patch parts when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Expand Down Expand Up @@ -7835,6 +7831,7 @@ Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_r
MAKE_OBSOLETE(M, Bool, allow_experimental_object_type, false) \
MAKE_OBSOLETE(M, BoolAuto, insert_select_deduplicate, Field{"auto"}) \
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
MAKE_OBSOLETE(M, Bool, export_merge_tree_partition_lock_inside_the_task, false) \
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"},
{"export_merge_tree_partition_task_timeout_seconds", 0, 3600, "New setting to control the timeout for export partition tasks."},
{"export_merge_tree_partition_manifest_ttl", 180, 86400, "Reasonable default for real usage"},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "Obsolete. No-op."},
});
addSettingsChanges(settings_changes_history, "26.1",
{
Expand Down
4 changes: 0 additions & 4 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;

Expand Down Expand Up @@ -154,7 +153,6 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve lock_inside_the_task in serialized manifests

Dropping lock_inside_the_task from ExportReplicatedMergeTreePartitionManifest::toJsonString() breaks rolling upgrades: a new replica now writes metadata without this key, but older replicas still parsing the same ZooKeeper metadata.json call getValue<bool>("lock_inside_the_task") and throw on missing field. In a mixed-version cluster, that makes export-partition tasks unreadable/failing on old nodes until every replica is upgraded. Keep writing the field (e.g., always false) for one compatibility window before removing it.

Useful? React with 👍 / 👎.

std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Expand Down Expand Up @@ -208,8 +206,6 @@ struct ExportReplicatedMergeTreePartitionManifest
/// what to do if it's not a valid value?
}

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

return manifest;
Expand Down
71 changes: 0 additions & 71 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp

This file was deleted.

36 changes: 0 additions & 36 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.h

This file was deleted.

96 changes: 25 additions & 71 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <Common/ProfileEvents.h>
#include "Storages/MergeTree/ExportPartitionUtils.h"
#include "Storages/MergeTree/MergeTreePartExportManifest.h"
#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h"
#include "Formats/FormatFactory.h"
#include <Core/Settings.h>

Expand Down Expand Up @@ -176,90 +175,45 @@ void ExportPartitionTaskScheduler::run()

auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest);

/// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly.
/// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being
/// recreated with a new schema before the export task is scheduled.
if (manifest.lock_inside_the_task)
try
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task");
std::lock_guard part_export_lock(storage.export_manifests_mutex);
LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table");

MergeTreePartExportManifest part_export_manifest(
destination_storage,
part,
LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name);
continue;
}

LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name);

storage.exportPartToTable(
part->name,
destination_storage_id,
manifest.transaction_id,
manifest.query_id,
context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
context->getSettingsCopy(),
storage.getInMemoryMetadataPtr(),
context,
manifest.iceberg_metadata_json,
/*allow_outdated_parts*/ true,
[this, key, zk_part_name, manifest, destination_storage]
(MergeTreePartExportManifest::CompletionCallbackResult result)
{
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
});

part_export_manifest.task = std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest);

/// todo arthur this might conflict with the standalone export part. what to do in this case?
if (!storage.export_manifests.emplace(part_export_manifest).second)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is already being exported, skipping", zk_part_name);
continue;
}

if (!storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task))
{
storage.export_manifests.erase(part_export_manifest);
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to schedule export part task, skipping");
return;
}

scheduled_exports_count++;
}
else
catch (const Exception &)
{
try
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table");

LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name);
continue;
}

LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name);

storage.exportPartToTable(
part->name,
destination_storage_id,
manifest.transaction_id,
context,
manifest.iceberg_metadata_json,
/*allow_outdated_parts*/ true,
[this, key, zk_part_name, manifest, destination_storage]
(MergeTreePartExportManifest::CompletionCallbackResult result)
{
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
});

scheduled_exports_count++;
}
catch (const Exception &)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove);
zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name);
/// we should not increment retry_count because the node might just be full
}
tryLogCurrentException(__PRETTY_FUNCTION__);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove);
zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name);
/// we should not increment retry_count because the node might just be full
}

}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,6 @@ class MergeTreeData : public IStorage, public WithMutableContext
friend class IMergedBlockOutputStream; // for access to log
friend struct DataPartsLock; // for access to shared_parts_list/shared_ranges_in_parts
friend class ExportPartTask;
friend class ExportPartFromPartitionExportTask;

bool require_part_metadata;

Expand Down
2 changes: 0 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsBool export_merge_tree_partition_lock_inside_the_task;
extern const SettingsString export_merge_tree_part_filename_pattern;
extern const SettingsBool write_full_path_in_iceberg_metadata;
extern const SettingsBool allow_experimental_insert_into_iceberg;
Expand Down Expand Up @@ -8264,7 +8263,6 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding];
manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file];
manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];
manifest.lock_inside_the_task = query_context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task];

manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData
friend class ReplicatedMergeMutateTaskBase;
friend class ExportPartitionManifestUpdatingTask;
friend class ExportPartitionTaskScheduler;
friend class ExportPartFromPartitionExportTask;

using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
Expand Down
Loading