diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 4b13ebaa99f..11ad7835b51 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2640,6 +2640,77 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont renamed_parts.old_and_new_names.front().first.clear(); } +MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, + const Context & context, PartsTemporaryRename & renamed_parts) +{ + String partition_id; + + if (attach_part) + partition_id = partition->as().value.safeGet(); + else + partition_id = getPartitionIDFromQuery(partition, context); + + String source_dir = "detached/"; + + /// Let's compose a list of parts that should be added. + Strings parts; + if (attach_part) + { + validateDetachedPartName(partition_id); + parts.push_back(partition_id); + } + else + { + LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); + ActiveDataPartSet active_parts(format_version); + + std::set part_names; + for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) + { + String name = it.name(); + MergeTreePartInfo part_info; + // TODO what if name contains "_tryN" suffix? + if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)) + continue; + if (part_info.partition_id != partition_id) + continue; + LOG_DEBUG(log, "Found part " << name); + active_parts.add(name); + part_names.insert(name); + } + LOG_DEBUG(log, active_parts.size() << " of them are active"); + parts = active_parts.getParts(); + + /// Inactive parts rename so they can not be attached in case of repeated ATTACH. + for (const auto & name : part_names) + { + // TODO maybe use PartsTemporaryRename here? + String containing_part = active_parts.getContainingPart(name); + if (!containing_part.empty() && containing_part != name) + Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name); + } + } + + /// Try to rename all parts before attaching to prevent race with DROP DETACHED and another ATTACH. + for (const auto & source_part_name : parts) + renamed_parts.addPart(source_part_name, "attaching_" + source_part_name); + + /// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist. + LOG_DEBUG(log, "Checking parts"); + MutableDataPartsVector loaded_parts; + loaded_parts.reserve(parts.size()); + for (const auto & part_names : renamed_parts.old_and_new_names) + { + LOG_DEBUG(log, "Checking part " << part_names.second); + MutableDataPartPtr part = std::make_shared(*this, part_names.first); + part->relative_path = source_dir + part_names.second; + loadPartAndFixMetadata(part); + loaded_parts.push_back(part); + } + + return loaded_parts; +} + MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const { DataParts res; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 62cebf32f76..3592164fed5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -407,6 +407,9 @@ public: void dropDetached(const ASTPtr & partition, bool part, const Context & context); + MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, + const Context & context, PartsTemporaryRename & renamed_parts); + /// Returns Committed parts DataParts getDataParts() const; DataPartsVector getDataPartsVector() const; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index db5632c3fe9..3464255e1b8 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1018,59 +1018,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par { // TODO: should get some locks to prevent race with 'alter … modify column' - String partition_id; - - if (attach_part) - partition_id = partition->as().value.safeGet(); - else - partition_id = getPartitionIDFromQuery(partition, context); - - String source_dir = "detached/"; - - /// Let's make a list of parts to add. - Strings parts; - if (attach_part) - { - validateDetachedPartName(partition_id); - parts.push_back(partition_id); - } - else - { - LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); - ActiveDataPartSet active_parts(format_version); - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) - { - const String & name = it.name(); - MergeTreePartInfo part_info; - /// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored - // TODO what if name contains "_tryN" suffix? - if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version) - || part_info.partition_id != partition_id) - { - continue; - } - LOG_DEBUG(log, "Found part " << name); - active_parts.add(name); - } - LOG_DEBUG(log, active_parts.size() << " of them are active"); - parts = active_parts.getParts(); - - // TODO should we rename inactive parts? (see StorageReplicatedMergeTree::attachPartition) - } - - PartsTemporaryRename renamed_parts(full_path + source_dir); - for (const auto & source_part_name : parts) - renamed_parts.addPart(source_part_name, "attaching_" + source_part_name); - - std::vector loaded_parts; - for (const auto & part_names : renamed_parts.old_and_new_names) - { - LOG_DEBUG(log, "Checking data in " << part_names.second); - MutableDataPartPtr part = std::make_shared(*this, part_names.first); - part->relative_path = source_dir + part_names.second; - loadPartAndFixMetadata(part); - loaded_parts.push_back(part); - } + PartsTemporaryRename renamed_parts(full_path + "detached/"); + MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts); for (size_t i = 0; i < loaded_parts.size(); ++i) { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 67577dee2b6..7e192d77a33 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3545,70 +3545,16 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool assertNotReadonly(); - String partition_id; - - if (attach_part) - partition_id = partition->as().value.safeGet(); - else - partition_id = getPartitionIDFromQuery(partition, query_context); - - String source_dir = "detached/"; - - /// Let's compose a list of parts that should be added. - Strings parts; - if (attach_part) - { - validateDetachedPartName(partition_id); - parts.push_back(partition_id); - } - else - { - LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); - ActiveDataPartSet active_parts(format_version); - - std::set part_names; - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) - { - String name = it.name(); - MergeTreePartInfo part_info; - // TODO what if name contains "_tryN" suffix? - if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)) - continue; - if (part_info.partition_id != partition_id) - continue; - LOG_DEBUG(log, "Found part " << name); - active_parts.add(name); - part_names.insert(name); - } - LOG_DEBUG(log, active_parts.size() << " of them are active"); - parts = active_parts.getParts(); - - /// Inactive parts rename so they can not be attached in case of repeated ATTACH. - for (const auto & name : part_names) - { - String containing_part = active_parts.getContainingPart(name); - if (!containing_part.empty() && containing_part != name) - Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name); - } - } - - /// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist. - LOG_DEBUG(log, "Checking parts"); - std::vector loaded_parts; - for (const String & part : parts) - { - LOG_DEBUG(log, "Checking part " << part); - loaded_parts.push_back(loadPartAndFixMetadata(source_dir + part)); - } - - // TODO fix race with DROP DETACHED + PartsTemporaryRename renamed_parts(full_path + "detached/"); + MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here. - for (auto & part : loaded_parts) + for (size_t i = 0; i < loaded_parts.size(); ++i) { - String old_name = part->name; - output.writeExistingPart(part); - LOG_DEBUG(log, "Attached part " << old_name << " as " << part->name); + String old_name = loaded_parts[i]->name; + output.writeExistingPart(loaded_parts[i]); + renamed_parts.old_and_new_names[i].first.clear(); + LOG_DEBUG(log, "Attached part " << old_name << " as " << loaded_parts[i]->name); } }