parallel attach from multiple partitions

This commit is contained in:
Kirill Nikiforov 2024-08-16 01:24:52 +04:00
parent ee89a36b4a
commit 624afd68c8
2 changed files with 95 additions and 30 deletions

View File

@ -8030,25 +8030,84 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto storage_settings_ptr = getSettings();
const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
const auto metadata_snapshot = getInMemoryMetadataPtr();
const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
Stopwatch watch;
std::unordered_set<String> partitions;
if (partition->as<ASTPartition>()->all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
partitions = src_data.getAllPartitionIds();
}
else
{
partitions = std::unordered_set<String>();
partitions.emplace(getPartitionIDFromQuery(partition, query_context));
}
LOG_INFO(log, "Will try to attach {} partitions", partitions.size());
const Stopwatch watch;
ProfileEventsScope profile_events_scope;
const auto zookeeper = getZooKeeper();
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, query_context);
const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
try
{
std::unique_ptr<ReplicatedMergeTreeLogEntryData> entries[partitions.size()];
size_t idx = 0;
for (const auto & partition_id : partitions)
{
entries[idx] = replacePartitionFromImpl(watch,
profile_events_scope,
metadata_snapshot,
src_data,
partition_id,
zookeeper,
replace,
zero_copy_enabled,
storage_settings_ptr->always_use_copy_instead_of_hardlinks,
query_context);
++idx;
}
for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
lock2.reset();
lock1.reset();
}
catch(...)
{
lock2.reset();
lock1.reset();
throw;
}
}
std::unique_ptr<ReplicatedMergeTreeLogEntryData> StorageReplicatedMergeTree::replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context)
{
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
{
@ -8133,11 +8192,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
if (replace)
@ -8145,7 +8202,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Replace can only work on the same disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8160,7 +8217,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Attach can work on another disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8176,15 +8233,16 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
//ReplicatedMergeTreeLogEntryData entry;
auto entry = std::make_unique<ReplicatedMergeTreeLogEntryData>();
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry->source_replica = replica_name;
entry->create_time = time(nullptr);
entry->replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
auto & entry_replace = *entry->replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
@ -8225,7 +8283,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this, NO_TRANSACTION_RAW);
{
@ -8275,14 +8333,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
lock2.reset();
lock1.reset();
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
@ -8291,10 +8346,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
return entry;
}
throw Exception(

View File

@ -37,6 +37,7 @@
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Parsers/SyncReplicaMode.h>
@ -1013,6 +1014,18 @@ private:
DataPartsVector::const_iterator it;
};
const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
std::unique_ptr<ReplicatedMergeTreeLogEntryData> replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);