Merge pull request #28085 from myrrc/improvement/merge-tree-part-opt-parse

optional<> semantics for parsing MergeTreePartInfo and DetachedPartInfo
This commit is contained in:
alexey-milovidov 2021-09-10 00:04:03 +03:00 committed by GitHub
commit 2236a5df98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 170 additions and 108 deletions

View File

@ -2,18 +2,17 @@
#include <IO/ReadBufferFromMemory.h>
namespace DB
{
/** Allows to read from std::string-like object.
*/
/// Allows to read from std::string-like object.
class ReadBufferFromString : public ReadBufferFromMemory
{
public:
/// std::string or something similar
template <typename S>
ReadBufferFromString(const S & s) : ReadBufferFromMemory(s.data(), s.size()) {}
};
explicit ReadBufferFromString(std::string_view s) : ReadBufferFromMemory(s.data(), s.size()) {}
};
}

View File

@ -908,6 +908,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
std::unordered_set<String> defined_disk_names;
for (const auto & disk_ptr : disks)
defined_disk_names.insert(disk_ptr->getName());
@ -917,9 +918,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version))
throw Exception("Part " + backQuote(it->name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK);
if (MergeTreePartInfo::tryParsePartName(it->name(), format_version))
throw Exception(ErrorCodes::UNKNOWN_DISK,
"Part {} was found on disk {} which is not defined in the storage policy",
backQuote(it->name()), backQuote(disk_name));
}
}
}
@ -930,10 +932,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME || it->name() == MergeTreeData::DETACHED_DIR_NAME)
if (startsWith(it->name(), "tmp")
|| it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
@ -980,25 +985,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
pool.scheduleOrThrowOnError([&]
{
const auto & part_name = part_names_with_disk.first;
const auto part_disk_ptr = part_names_with_disk.second;
const auto & [part_name, part_disk_ptr] = part_names_with_disk;
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_opt)
return;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
auto part = createPart(part_name, *part_opt, single_disk_volume, part_name);
bool broken = false;
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
LOG_WARNING(log, "Detaching stale part {}{}, which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", getFullPathOnDisk(part_disk_ptr), part_name);
LOG_WARNING(log,
"Detaching stale part {}{}, which should have been deleted after a move. That can only happen "
"after unclean restart of ClickHouse after move of a part having an operation blocking that "
"stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
return;
}
@ -1026,25 +1040,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Ignore broken parts that can appear as a result of hard server restart.
if (broken)
{
LOG_ERROR(log, "Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually", getFullPathOnDisk(part_disk_ptr), part_name);
LOG_ERROR(log,
"Detaching broken part {}{}. If it happened after update, it is likely because of backward "
"incompatibility. You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
return;
}
if (!part->index_granularity_info.is_adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
std::lock_guard loading_lock(mutex);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name);
addPartContributionToDataVolume(part);
});
@ -3328,11 +3351,12 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
Strings part_names = backup->list(data_path_in_backup);
for (const String & part_name : part_names)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_info)
continue;
if (!partition_ids.empty() && !partition_ids.contains(part_info.partition_id))
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
continue;
UInt64 total_size_of_part = 0;
@ -3369,7 +3393,7 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto part = createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
auto part = createPart(part_name, *part_info, single_disk_volume, relative_temp_part_dir);
part->loadColumnsChecksumsIndexes(false, true);
renameTempPartAndAdd(part, increment);
};
@ -3581,11 +3605,10 @@ std::vector<DetachedPartInfo> MergeTreeData::getDetachedParts() const
{
for (auto it = disk->iterateDirectory(detached_path); it->isValid(); it->next())
{
res.emplace_back();
auto & part = res.back();
DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version);
auto part = DetachedPartInfo::parseDetachedPartName(it->name(), format_version);
part.disk = disk->getName();
res.push_back(std::move(part));
}
}
}
@ -3656,7 +3679,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
validateDetachedPartName(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id);
if (MergeTreePartInfo::tryParsePartName(part_id, nullptr, format_version))
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
name_to_disk[part_id] = getDiskForPart(part_id, source_dir);
}
else
@ -3672,12 +3695,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next())
{
const String & name = it->name();
MergeTreePartInfo part_info;
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
|| part_info.partition_id != partition_id)
if (auto part_opt = MergeTreePartInfo::tryParsePartName(name, format_version);
!part_opt || part_opt->partition_id != partition_id)
{
continue;
}

View File

@ -15,13 +15,12 @@ namespace ErrorCodes
MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version)
{
MergeTreePartInfo part_info;
if (!tryParsePartName(part_name, &part_info, format_version))
throw Exception("Unexpected part name: " + part_name, ErrorCodes::BAD_DATA_PART_NAME);
return part_info;
if (auto part_opt = tryParsePartName(part_name, format_version))
return *part_opt;
else
throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Unexpected part name: {}", part_name);
}
void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTreeDataFormatVersion format_version)
{
if (partition_id.empty())
@ -43,22 +42,26 @@ void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTr
}
bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version)
std::optional<MergeTreePartInfo> MergeTreePartInfo::tryParsePartName(
std::string_view part_name, MergeTreeDataFormatVersion format_version)
{
ReadBufferFromString in(part_name);
String partition_id;
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
UInt32 min_yyyymmdd = 0;
UInt32 max_yyyymmdd = 0;
if (!tryReadIntText(min_yyyymmdd, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_yyyymmdd, in)
|| !checkChar('_', in))
{
return false;
return std::nullopt;
}
partition_id = toString(min_yyyymmdd / 100);
}
else
@ -76,9 +79,7 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
/// Sanity check
if (partition_id.empty())
{
return false;
}
return std::nullopt;
Int64 min_block_num = 0;
Int64 max_block_num = 0;
@ -91,14 +92,12 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
|| !checkChar('_', in)
|| !tryReadIntText(level, in))
{
return false;
return std::nullopt;
}
/// Sanity check
if (min_block_num > max_block_num)
{
return false;
}
return std::nullopt;
if (!in.eof())
{
@ -106,29 +105,30 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
|| !tryReadIntText(mutation, in)
|| !in.eof())
{
return false;
return std::nullopt;
}
}
if (part_info)
MergeTreePartInfo part_info;
part_info.partition_id = std::move(partition_id);
part_info.min_block = min_block_num;
part_info.max_block = max_block_num;
if (level == LEGACY_MAX_LEVEL)
{
part_info->partition_id = std::move(partition_id);
part_info->min_block = min_block_num;
part_info->max_block = max_block_num;
if (level == LEGACY_MAX_LEVEL)
{
/// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like
/// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295".
/// So we replace unexpected max level to make contains(...) method and comparison operators work
/// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged.
part_info->use_leagcy_max_level = true;
level = MAX_LEVEL;
}
part_info->level = level;
part_info->mutation = mutation;
/// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like
/// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295".
/// So we replace unexpected max level to make contains(...) method and comparison operators work
/// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged.
part_info.use_leagcy_max_level = true;
level = MAX_LEVEL;
}
return true;
part_info.level = level;
part_info.mutation = mutation;
return part_info;
}
@ -235,55 +235,75 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con
return wb.str();
}
const std::vector<String> DetachedPartInfo::DETACH_REASONS =
{
"broken",
"unexpected",
"noquorum",
"ignored",
"broken-on-start",
"clone",
"attaching",
"deleting",
"tmp-fetch",
};
bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info,
MergeTreeDataFormatVersion format_version)
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
std::string_view dir_name, MergeTreeDataFormatVersion format_version)
{
DetachedPartInfo part_info;
part_info.dir_name = dir_name;
/// First, try to find known prefix and parse dir_name as <prefix>_<partname>.
/// First, try to find known prefix and parse dir_name as <prefix>_<part_name>.
/// Arbitrary strings are not allowed for partition_id, so known_prefix cannot be confused with partition_id.
for (const auto & known_prefix : DETACH_REASONS)
for (std::string_view known_prefix : DETACH_REASONS)
{
if (dir_name.starts_with(known_prefix) && known_prefix.size() < dir_name.size() && dir_name[known_prefix.size()] == '_')
if (dir_name.starts_with(known_prefix)
&& known_prefix.size() < dir_name.size()
&& dir_name[known_prefix.size()] == '_')
{
part_info.prefix = known_prefix;
String part_name = dir_name.substr(known_prefix.size() + 1);
bool parsed = MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version);
return part_info.valid_name = parsed;
const std::string_view part_name = dir_name.substr(known_prefix.size() + 1);
if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version))
{
part_info.valid_name = true;
part_info.addParsedPartInfo(*part_opt);
}
else
part_info.valid_name = false;
return part_info;
}
}
/// Next, try to parse dir_name as <part_name>.
if (MergeTreePartInfo::tryParsePartName(dir_name, &part_info, format_version))
return part_info.valid_name = true;
if (auto part_opt = MergeTreePartInfo::tryParsePartName(dir_name, format_version))
{
part_info.valid_name = true;
part_info.addParsedPartInfo(*part_opt);
return part_info;
}
/// Next, as <prefix>_<partname>. Use entire name as prefix if it fails.
part_info.prefix = dir_name;
const auto first_separator = dir_name.find_first_of('_');
const size_t first_separator = dir_name.find_first_of('_');
if (first_separator == String::npos)
return part_info.valid_name = false;
{
part_info.valid_name = false;
return part_info;
}
const auto part_name = dir_name.substr(first_separator + 1,
dir_name.size() - first_separator - 1);
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return part_info.valid_name = false;
const std::string_view part_name = dir_name.substr(
first_separator + 1,
dir_name.size() - first_separator - 1);
part_info.prefix = dir_name.substr(0, first_separator);
return part_info.valid_name = true;
if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version))
{
part_info.valid_name = true;
part_info.prefix = dir_name.substr(0, first_separator);
part_info.addParsedPartInfo(*part_opt);
}
else
part_info.valid_name = false;
return part_info;
}
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part)
{
// Both class are aggregates so it's ok.
static_cast<MergeTreePartInfo &>(*this) = part;
}
}

View File

@ -1,8 +1,10 @@
#pragma once
#include <limits>
#include <optional>
#include <tuple>
#include <vector>
#include <array>
#include <common/types.h>
#include <common/DayNum.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
@ -98,7 +100,8 @@ struct MergeTreePartInfo
static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version); // -V1071
static bool tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version);
static std::optional<MergeTreePartInfo> tryParsePartName(
std::string_view part_name, MergeTreeDataFormatVersion format_version);
static void parseMinMaxDatesFromPartName(const String & part_name, DayNum & min_date, DayNum & max_date);
@ -122,11 +125,27 @@ struct DetachedPartInfo : public MergeTreePartInfo
/// If false, MergeTreePartInfo is in invalid state (directory name was not successfully parsed).
bool valid_name;
static const std::vector<String> DETACH_REASONS;
static constexpr auto DETACH_REASONS = std::to_array<std::string_view>({
"broken",
"unexpected",
"noquorum",
"ignored",
"broken-on-start",
"clone",
"attaching",
"deleting",
"tmp-fetch"
});
/// NOTE: It may parse part info incorrectly.
/// For example, if prefix contain '_' or if DETACH_REASONS doesn't contain prefix.
static bool tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, MergeTreeDataFormatVersion format_version);
/// For example, if prefix contains '_' or if DETACH_REASONS doesn't contain prefix.
// This method has different semantics with MergeTreePartInfo::tryParsePartName.
// Detached parts are always parsed regardless of their validity.
// DetachedPartInfo::valid_name field specifies whether parsing was successful or not.
static DetachedPartInfo parseDetachedPartName(std::string_view dir_name, MergeTreeDataFormatVersion format_version);
private:
void addParsedPartInfo(const MergeTreePartInfo& part);
};
using DetachedPartsInfo = std::vector<DetachedPartInfo>;

View File

@ -1161,10 +1161,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0,
[&](UInt64 acc, const String& part_name)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return acc + part_info.getBlocksCount();
if (const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version))
return acc + part_info->getBlocksCount();
LOG_ERROR(log, "Unexpected part name: {}", part_name);
return acc;
@ -1454,13 +1452,12 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
for (const auto it = disk->iterateDirectory(fs::path(relative_data_path) / "detached/"); it->isValid(); it->next())
{
MergeTreePartInfo part_info;
const auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version);
if (!MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version) ||
part_info.partition_id != actual_part_info.partition_id)
if (!part_info || part_info->partition_id != actual_part_info.partition_id)
continue;
const String part_old_name = part_info.getPartName();
const String part_old_name = part_info->getPartName();
const String part_path = fs::path("detached") / part_old_name;
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
@ -6310,6 +6307,7 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
String partition_prefix = partition_id + "_";
zkutil::AsyncResponses<Coordination::GetResponse> get_futures;
for (const String & block_id : blocks)
{
if (startsWith(block_id, partition_prefix))
@ -6328,9 +6326,10 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
continue;
ReadBufferFromString buf(result.data);
MergeTreePartInfo part_info;
bool parsed = MergeTreePartInfo::tryParsePartName(result.data, &part_info, format_version);
if (!parsed || (min_block_num <= part_info.min_block && part_info.max_block <= max_block_num))
const auto part_info = MergeTreePartInfo::tryParsePartName(result.data, format_version);
if (!part_info || (min_block_num <= part_info->min_block && part_info->max_block <= max_block_num))
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
}
}
@ -7422,12 +7421,15 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_n
bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name)
{
fs::directory_iterator dir_end;
for (const std::string & path : getDataPaths())
{
for (fs::directory_iterator dir_it{fs::path(path) / "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it->path().filename(), &part_info, format_version) && part_info.partition_id == partition_name)
const String file_name = dir_it->path().filename().string();
auto part_info = MergeTreePartInfo::tryParsePartName(file_name, format_version);
if (part_info && part_info->partition_id == partition_name)
return true;
}
}