fix loading of parts with old syntax

This commit is contained in:
Anton Popov 2022-10-09 15:39:20 +00:00
parent 9f1104c2ad
commit 286da7ee19
2 changed files with 27 additions and 26 deletions

View File

@ -942,11 +942,11 @@ Int64 MergeTreeData::getMaxBlockNumber() const
return max_block_num;
}
void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const DiskPtr & disk)
void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk)
{
auto & current_ptr = root_by_partition[info.partition_id];
if (!current_ptr)
current_ptr = std::make_shared<Node>(MergeTreePartInfo{}, disk);
current_ptr = std::make_shared<Node>(MergeTreePartInfo{}, "", disk);
auto * current = current_ptr.get();
while (true)
@ -987,23 +987,24 @@ void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const D
}
}
current->children.emplace(info, std::make_shared<Node>(info, disk));
current->children.emplace(info, std::make_shared<Node>(info, name, disk));
break;
}
}
MergeTreeData::PartLoadingTree
MergeTreeData::PartLoadingTree::build(std::vector<PartInfoWithDisk> nodes)
MergeTreeData::PartLoadingTree::build(std::vector<PartLoadingInfo> nodes)
{
std::sort(nodes.begin(), nodes.end(),
[](const auto & lhs, const auto & rhs)
{
return lhs.first.level > rhs.first.level;
});
std::sort(nodes.begin(), nodes.end(), [](const auto & lhs, const auto & rhs)
{
const auto & lhs_info = std::get<0>(lhs);
const auto & rhs_info = std::get<0>(rhs);
return std::tie(lhs_info.level, lhs_info.mutation) > std::tie(rhs_info.level, rhs_info.mutation);
});
PartLoadingTree tree;
for (const auto & [info, disk] : nodes)
tree.add(info, disk);
for (const auto & [info, name, disk] : nodes)
tree.add(info, name, disk);
return tree;
}
@ -1083,9 +1084,8 @@ void MergeTreeData::loadDataPartsFromDisk(
std::atomic<bool> has_lightweight_in_parts = false;
std::mutex loading_mutex;
auto load_part = [&](const MergeTreePartInfo & part_info, const DiskPtr & part_disk_ptr)
auto load_part = [&](const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr)
{
auto part_name = part_info.getPartName();
LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName());
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
@ -1235,7 +1235,7 @@ void MergeTreeData::loadDataPartsFromDisk(
remaining_thread_parts.erase(thread_idx);
}
bool part_is_ok = load_part(thread_part->info, thread_part->disk);
bool part_is_ok = load_part(thread_part->info, thread_part->name, thread_part->disk);
if (!part_is_ok)
{
@ -1373,7 +1373,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
ThreadPool pool(disks.size());
std::vector<PartLoadingTree::PartInfoWithDisk> parts_with_disks;
std::vector<PartLoadingTree::PartLoadingInfo> parts_with_disks;
for (const auto & disk_ptr : disks)
{
@ -1386,12 +1386,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
|| it->name() == MergeTreeData::DETACHED_DIR_NAME
|| startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
parts_with_disks.emplace_back(*part_info, disk_ptr);
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
parts_with_disks.emplace_back(*part_info, it->name(), disk_ptr);
}
});
}

View File

@ -1334,22 +1334,23 @@ private:
public:
struct Node
{
Node(const MergeTreePartInfo & info_, const DiskPtr & disk_)
: info(info_), disk(disk_)
Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
: info(info_), name(name_), disk(disk_)
{
}
MergeTreePartInfo info;
DiskPtr disk;
const MergeTreePartInfo info;
const String name;
const DiskPtr disk;
std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
};
using NodePtr = std::shared_ptr<Node>;
using PartInfoWithDisk = std::pair<MergeTreePartInfo, DiskPtr>;
using PartLoadingInfo = std::tuple<MergeTreePartInfo, String, DiskPtr>;
static PartLoadingTree build(std::vector<PartInfoWithDisk> nodes);
void add(const MergeTreePartInfo & info, const DiskPtr & disk);
static PartLoadingTree build(std::vector<PartLoadingInfo> nodes);
void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk);
const std::unordered_map<String, NodePtr> & getRoots() const { return root_by_partition; }
private: