mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
refactor attachPartition
This commit is contained in:
parent
2f33df1b2e
commit
c6717e0d3f
@ -2640,6 +2640,77 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont
|
||||
renamed_parts.old_and_new_names.front().first.clear();
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
|
||||
const Context & context, PartsTemporaryRename & renamed_parts)
|
||||
{
|
||||
String partition_id;
|
||||
|
||||
if (attach_part)
|
||||
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
|
||||
else
|
||||
partition_id = getPartitionIDFromQuery(partition, context);
|
||||
|
||||
String source_dir = "detached/";
|
||||
|
||||
/// Let's compose a list of parts that should be added.
|
||||
Strings parts;
|
||||
if (attach_part)
|
||||
{
|
||||
validateDetachedPartName(partition_id);
|
||||
parts.push_back(partition_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
|
||||
ActiveDataPartSet active_parts(format_version);
|
||||
|
||||
std::set<String> part_names;
|
||||
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
|
||||
{
|
||||
String name = it.name();
|
||||
MergeTreePartInfo part_info;
|
||||
// TODO what if name contains "_tryN" suffix?
|
||||
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version))
|
||||
continue;
|
||||
if (part_info.partition_id != partition_id)
|
||||
continue;
|
||||
LOG_DEBUG(log, "Found part " << name);
|
||||
active_parts.add(name);
|
||||
part_names.insert(name);
|
||||
}
|
||||
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
||||
parts = active_parts.getParts();
|
||||
|
||||
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
|
||||
for (const auto & name : part_names)
|
||||
{
|
||||
// TODO maybe use PartsTemporaryRename here?
|
||||
String containing_part = active_parts.getContainingPart(name);
|
||||
if (!containing_part.empty() && containing_part != name)
|
||||
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to rename all parts before attaching to prevent race with DROP DETACHED and another ATTACH.
|
||||
for (const auto & source_part_name : parts)
|
||||
renamed_parts.addPart(source_part_name, "attaching_" + source_part_name);
|
||||
|
||||
/// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
|
||||
LOG_DEBUG(log, "Checking parts");
|
||||
MutableDataPartsVector loaded_parts;
|
||||
loaded_parts.reserve(parts.size());
|
||||
for (const auto & part_names : renamed_parts.old_and_new_names)
|
||||
{
|
||||
LOG_DEBUG(log, "Checking part " << part_names.second);
|
||||
MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_names.first);
|
||||
part->relative_path = source_dir + part_names.second;
|
||||
loadPartAndFixMetadata(part);
|
||||
loaded_parts.push_back(part);
|
||||
}
|
||||
|
||||
return loaded_parts;
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
{
|
||||
DataParts res;
|
||||
|
@ -407,6 +407,9 @@ public:
|
||||
|
||||
void dropDetached(const ASTPtr & partition, bool part, const Context & context);
|
||||
|
||||
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
|
||||
const Context & context, PartsTemporaryRename & renamed_parts);
|
||||
|
||||
/// Returns Committed parts
|
||||
DataParts getDataParts() const;
|
||||
DataPartsVector getDataPartsVector() const;
|
||||
|
@ -1018,59 +1018,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
|
||||
{
|
||||
// TODO: should get some locks to prevent race with 'alter … modify column'
|
||||
|
||||
String partition_id;
|
||||
|
||||
if (attach_part)
|
||||
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
|
||||
else
|
||||
partition_id = getPartitionIDFromQuery(partition, context);
|
||||
|
||||
String source_dir = "detached/";
|
||||
|
||||
/// Let's make a list of parts to add.
|
||||
Strings parts;
|
||||
if (attach_part)
|
||||
{
|
||||
validateDetachedPartName(partition_id);
|
||||
parts.push_back(partition_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
|
||||
ActiveDataPartSet active_parts(format_version);
|
||||
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
|
||||
{
|
||||
const String & name = it.name();
|
||||
MergeTreePartInfo part_info;
|
||||
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
|
||||
// TODO what if name contains "_tryN" suffix?
|
||||
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
|
||||
|| part_info.partition_id != partition_id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
LOG_DEBUG(log, "Found part " << name);
|
||||
active_parts.add(name);
|
||||
}
|
||||
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
||||
parts = active_parts.getParts();
|
||||
|
||||
// TODO should we rename inactive parts? (see StorageReplicatedMergeTree::attachPartition)
|
||||
}
|
||||
|
||||
PartsTemporaryRename renamed_parts(full_path + source_dir);
|
||||
for (const auto & source_part_name : parts)
|
||||
renamed_parts.addPart(source_part_name, "attaching_" + source_part_name);
|
||||
|
||||
std::vector<MutableDataPartPtr> loaded_parts;
|
||||
for (const auto & part_names : renamed_parts.old_and_new_names)
|
||||
{
|
||||
LOG_DEBUG(log, "Checking data in " << part_names.second);
|
||||
MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_names.first);
|
||||
part->relative_path = source_dir + part_names.second;
|
||||
loadPartAndFixMetadata(part);
|
||||
loaded_parts.push_back(part);
|
||||
}
|
||||
PartsTemporaryRename renamed_parts(full_path + "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts);
|
||||
|
||||
for (size_t i = 0; i < loaded_parts.size(); ++i)
|
||||
{
|
||||
|
@ -3545,70 +3545,16 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
|
||||
|
||||
assertNotReadonly();
|
||||
|
||||
String partition_id;
|
||||
|
||||
if (attach_part)
|
||||
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
|
||||
else
|
||||
partition_id = getPartitionIDFromQuery(partition, query_context);
|
||||
|
||||
String source_dir = "detached/";
|
||||
|
||||
/// Let's compose a list of parts that should be added.
|
||||
Strings parts;
|
||||
if (attach_part)
|
||||
{
|
||||
validateDetachedPartName(partition_id);
|
||||
parts.push_back(partition_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
|
||||
ActiveDataPartSet active_parts(format_version);
|
||||
|
||||
std::set<String> part_names;
|
||||
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
|
||||
{
|
||||
String name = it.name();
|
||||
MergeTreePartInfo part_info;
|
||||
// TODO what if name contains "_tryN" suffix?
|
||||
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version))
|
||||
continue;
|
||||
if (part_info.partition_id != partition_id)
|
||||
continue;
|
||||
LOG_DEBUG(log, "Found part " << name);
|
||||
active_parts.add(name);
|
||||
part_names.insert(name);
|
||||
}
|
||||
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
||||
parts = active_parts.getParts();
|
||||
|
||||
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
|
||||
for (const auto & name : part_names)
|
||||
{
|
||||
String containing_part = active_parts.getContainingPart(name);
|
||||
if (!containing_part.empty() && containing_part != name)
|
||||
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
|
||||
}
|
||||
}
|
||||
|
||||
/// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
|
||||
LOG_DEBUG(log, "Checking parts");
|
||||
std::vector<MutableDataPartPtr> loaded_parts;
|
||||
for (const String & part : parts)
|
||||
{
|
||||
LOG_DEBUG(log, "Checking part " << part);
|
||||
loaded_parts.push_back(loadPartAndFixMetadata(source_dir + part));
|
||||
}
|
||||
|
||||
// TODO fix race with DROP DETACHED
|
||||
PartsTemporaryRename renamed_parts(full_path + "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
|
||||
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
|
||||
for (auto & part : loaded_parts)
|
||||
for (size_t i = 0; i < loaded_parts.size(); ++i)
|
||||
{
|
||||
String old_name = part->name;
|
||||
output.writeExistingPart(part);
|
||||
LOG_DEBUG(log, "Attached part " << old_name << " as " << part->name);
|
||||
String old_name = loaded_parts[i]->name;
|
||||
output.writeExistingPart(loaded_parts[i]);
|
||||
renamed_parts.old_and_new_names[i].first.clear();
|
||||
LOG_DEBUG(log, "Attached part " << old_name << " as " << loaded_parts[i]->name);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user