From 624afd68c86bb4fc3eee76f3fb9779ffb2319c68 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Fri, 16 Aug 2024 01:24:52 +0400 Subject: [PATCH] parallel attach from multiple partitions --- src/Storages/StorageReplicatedMergeTree.cpp | 112 ++++++++++++++------ src/Storages/StorageReplicatedMergeTree.h | 13 +++ 2 files changed, 95 insertions(+), 30 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a3c1ab7cdff..67f51b186c5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8030,25 +8030,84 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// First argument is true, because we possibly will add new data to current table. auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); - auto storage_settings_ptr = getSettings(); - auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); - auto metadata_snapshot = getInMemoryMetadataPtr(); + const auto storage_settings_ptr = getSettings(); + const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); + const auto metadata_snapshot = getInMemoryMetadataPtr(); + const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - Stopwatch watch; + std::unordered_set partitions; + if (partition->as()->all) + { + if (replace) + throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); + + partitions = src_data.getAllPartitionIds(); + } + else + { + partitions = std::unordered_set(); + partitions.emplace(getPartitionIDFromQuery(partition, query_context)); + } + LOG_INFO(log, "Will try to attach {} partitions", partitions.size()); + + const Stopwatch watch; ProfileEventsScope profile_events_scope; + const auto zookeeper = getZooKeeper(); - MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, query_context); + const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication + || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; + try + { + std::unique_ptr entries[partitions.size()]; + size_t idx = 0; + for (const auto & partition_id : partitions) + { + entries[idx] = replacePartitionFromImpl(watch, + profile_events_scope, + metadata_snapshot, + src_data, + partition_id, + zookeeper, + replace, + zero_copy_enabled, + storage_settings_ptr->always_use_copy_instead_of_hardlinks, + query_context); + ++idx; + } + + for (const auto & entry : entries) + waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); + + lock2.reset(); + lock1.reset(); + } + catch(...) + { + lock2.reset(); + lock1.reset(); + + throw; + } +} + +std::unique_ptr StorageReplicatedMergeTree::replacePartitionFromImpl( + const Stopwatch & watch, + ProfileEventsScope & profile_events_scope, + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & src_data, + const String & partition_id, + const ZooKeeperPtr & zookeeper, + bool replace, + const bool & zero_copy_enabled, + const bool & always_use_copy_instead_of_hardlinks, + const ContextPtr & query_context) +{ /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); - LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - static const String TMP_PREFIX = "tmp_replace_from_"; - auto zookeeper = getZooKeeper(); - /// Retry if alter_partition_version changes for (size_t retry = 0; retry < 1000; ++retry) { @@ -8133,11 +8192,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( UInt64 index = lock->getNumber(); MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); - bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication - || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; IDataPartStorage::ClonePartParams clone_params { - .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), + .copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; if (replace) @@ -8145,7 +8202,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// Replace can only work on the same disk auto [dst_part, part_lock] = cloneAndLoadDataPart( src_part, - TMP_PREFIX, + TMP_PREFIX_REPLACE_PARTITION_FROM, dst_part_info, metadata_snapshot, clone_params, @@ -8160,7 +8217,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// Attach can work on another disk auto [dst_part, part_lock] = cloneAndLoadDataPart( src_part, - TMP_PREFIX, + TMP_PREFIX_REPLACE_PARTITION_FROM, dst_part_info, metadata_snapshot, clone_params, @@ -8176,15 +8233,16 @@ void StorageReplicatedMergeTree::replacePartitionFrom( part_checksums.emplace_back(hash_hex); } - ReplicatedMergeTreeLogEntryData entry; + //ReplicatedMergeTreeLogEntryData entry; + auto entry = std::make_unique(); { auto src_table_id = src_data.getStorageID(); - entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = replica_name; - entry.create_time = time(nullptr); - entry.replace_range_entry = std::make_shared(); + entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry->source_replica = replica_name; + entry->create_time = time(nullptr); + entry->replace_range_entry = std::make_shared(); - auto & entry_replace = *entry.replace_range_entry; + auto & entry_replace = *entry->replace_range_entry; entry_replace.drop_range_part_name = drop_range_fake_part_name; entry_replace.from_database = src_table_id.database_name; entry_replace.from_table = src_table_id.table_name; @@ -8225,7 +8283,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); /// Just update version, because merges assignment relies on it ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential)); Transaction transaction(*this, NO_TRANSACTION_RAW); { @@ -8275,14 +8333,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( } String log_znode_path = dynamic_cast(*op_results.back()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); for (auto & lock : ephemeral_locks) lock.assumeUnlocked(); - lock2.reset(); - lock1.reset(); - /// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); // No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block @@ -8291,10 +8346,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( parts_holder.clear(); cleanup_thread.wakeup(); - - waitForLogEntryToBeProcessedIfNecessary(entry, query_context); - - return; + return entry; } throw Exception( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..4291aebf3e8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -1013,6 +1014,18 @@ private: DataPartsVector::const_iterator it; }; + const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_"; + std::unique_ptr replacePartitionFromImpl( + const Stopwatch & watch, + ProfileEventsScope & profile_events_scope, + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & src_data, + const String & partition_id, + const zkutil::ZooKeeperPtr & zookeeper, + bool replace, + const bool & zero_copy_enabled, + const bool & always_use_copy_instead_of_hardlinks, + const ContextPtr & query_context); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);