mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
Optional semantics for [Detached]MergeTreePartInfo
This commit is contained in:
parent
739caf86d5
commit
f85e2e027c
@ -2,18 +2,17 @@
|
||||
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Allows to read from std::string-like object.
|
||||
*/
|
||||
/// Allows to read from std::string-like object.
|
||||
class ReadBufferFromString : public ReadBufferFromMemory
|
||||
{
|
||||
public:
|
||||
/// std::string or something similar
|
||||
template <typename S>
|
||||
ReadBufferFromString(const S & s) : ReadBufferFromMemory(s.data(), s.size()) {}
|
||||
};
|
||||
|
||||
explicit ReadBufferFromString(std::string_view s) : ReadBufferFromMemory(s.data(), s.size()) {}
|
||||
};
|
||||
}
|
||||
|
@ -884,6 +884,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
{
|
||||
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
|
||||
std::unordered_set<String> defined_disk_names;
|
||||
|
||||
for (const auto & disk_ptr : disks)
|
||||
defined_disk_names.insert(disk_ptr->getName());
|
||||
|
||||
@ -893,9 +894,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
{
|
||||
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
if (MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version))
|
||||
throw Exception("Part " + backQuote(it->name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK);
|
||||
if (MergeTreePartInfo::tryParsePartName(it->name(), format_version))
|
||||
throw Exception(ErrorCodes::UNKNOWN_DISK,
|
||||
"Part {} was found on disk {} which is not defined in the storage policy",
|
||||
backQuote(it->name()), backQuote(disk_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -906,10 +908,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
|
||||
{
|
||||
auto disk_ptr = *disk_it;
|
||||
|
||||
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
|
||||
{
|
||||
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
|
||||
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME || it->name() == MergeTreeData::DETACHED_DIR_NAME)
|
||||
if (startsWith(it->name(), "tmp")
|
||||
|| it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|
||||
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
|
||||
continue;
|
||||
|
||||
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
|
||||
@ -956,25 +961,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]
|
||||
{
|
||||
const auto & part_name = part_names_with_disk.first;
|
||||
const auto part_disk_ptr = part_names_with_disk.second;
|
||||
const auto & [part_name, part_disk_ptr] = part_names_with_disk;
|
||||
|
||||
MergeTreePartInfo part_info;
|
||||
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
|
||||
|
||||
if (!part_opt)
|
||||
return;
|
||||
|
||||
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
|
||||
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
|
||||
auto part = createPart(part_name, *part_opt, single_disk_volume, part_name);
|
||||
bool broken = false;
|
||||
|
||||
String part_path = fs::path(relative_data_path) / part_name;
|
||||
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
|
||||
|
||||
if (part_disk_ptr->exists(marker_path))
|
||||
{
|
||||
LOG_WARNING(log, "Detaching stale part {}{}, which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", getFullPathOnDisk(part_disk_ptr), part_name);
|
||||
LOG_WARNING(log,
|
||||
"Detaching stale part {}{}, which should have been deleted after a move. That can only happen "
|
||||
"after unclean restart of ClickHouse after move of a part having an operation blocking that "
|
||||
"stale copy of part.",
|
||||
getFullPathOnDisk(part_disk_ptr), part_name);
|
||||
|
||||
std::lock_guard loading_lock(mutex);
|
||||
|
||||
broken_parts_to_detach.push_back(part);
|
||||
|
||||
++suspicious_broken_parts;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1002,25 +1016,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
/// Ignore broken parts that can appear as a result of hard server restart.
|
||||
if (broken)
|
||||
{
|
||||
LOG_ERROR(log, "Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually", getFullPathOnDisk(part_disk_ptr), part_name);
|
||||
LOG_ERROR(log,
|
||||
"Detaching broken part {}{}. If it happened after update, it is likely because of backward "
|
||||
"incompatibility. You need to resolve this manually",
|
||||
getFullPathOnDisk(part_disk_ptr), part_name);
|
||||
|
||||
std::lock_guard loading_lock(mutex);
|
||||
|
||||
broken_parts_to_detach.push_back(part);
|
||||
|
||||
++suspicious_broken_parts;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!part->index_granularity_info.is_adaptive)
|
||||
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
|
||||
else
|
||||
has_adaptive_parts.store(true, std::memory_order_relaxed);
|
||||
|
||||
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
|
||||
|
||||
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
|
||||
part->setState(DataPartState::Committed);
|
||||
|
||||
std::lock_guard loading_lock(mutex);
|
||||
|
||||
if (!data_parts_indexes.insert(part).second)
|
||||
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
|
||||
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name);
|
||||
|
||||
addPartContributionToDataVolume(part);
|
||||
});
|
||||
@ -3292,11 +3315,12 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
|
||||
Strings part_names = backup->list(data_path_in_backup);
|
||||
for (const String & part_name : part_names)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version);
|
||||
|
||||
if (!part_info)
|
||||
continue;
|
||||
|
||||
if (!partition_ids.empty() && !partition_ids.contains(part_info.partition_id))
|
||||
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
|
||||
continue;
|
||||
|
||||
UInt64 total_size_of_part = 0;
|
||||
@ -3333,7 +3357,7 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
|
||||
}
|
||||
|
||||
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
|
||||
auto part = createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
|
||||
auto part = createPart(part_name, *part_info, single_disk_volume, relative_temp_part_dir);
|
||||
part->loadColumnsChecksumsIndexes(false, true);
|
||||
renameTempPartAndAdd(part, increment);
|
||||
};
|
||||
@ -3545,11 +3569,8 @@ std::vector<DetachedPartInfo> MergeTreeData::getDetachedParts() const
|
||||
{
|
||||
for (auto it = disk->iterateDirectory(detached_path); it->isValid(); it->next())
|
||||
{
|
||||
res.emplace_back();
|
||||
auto & part = res.back();
|
||||
|
||||
DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version);
|
||||
part.disk = disk->getName();
|
||||
auto res_it = res.emplace_back(DetachedPartInfo::parseDetachedPartName(it->name(), format_version));
|
||||
res_it.disk = disk->getName();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3620,7 +3641,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
|
||||
validateDetachedPartName(part_id);
|
||||
renamed_parts.addPart(part_id, "attaching_" + part_id);
|
||||
|
||||
if (MergeTreePartInfo::tryParsePartName(part_id, nullptr, format_version))
|
||||
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
|
||||
name_to_disk[part_id] = getDiskForPart(part_id, source_dir);
|
||||
}
|
||||
else
|
||||
@ -3636,12 +3657,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
|
||||
for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next())
|
||||
{
|
||||
const String & name = it->name();
|
||||
MergeTreePartInfo part_info;
|
||||
|
||||
// TODO what if name contains "_tryN" suffix?
|
||||
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
|
||||
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
|
||||
|| part_info.partition_id != partition_id)
|
||||
if (auto part_opt = MergeTreePartInfo::tryParsePartName(name, format_version);
|
||||
!part_opt || part_opt->partition_id != partition_id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
@ -15,13 +15,12 @@ namespace ErrorCodes
|
||||
|
||||
MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
if (!tryParsePartName(part_name, &part_info, format_version))
|
||||
throw Exception("Unexpected part name: " + part_name, ErrorCodes::BAD_DATA_PART_NAME);
|
||||
return part_info;
|
||||
if (auto part_opt = tryParsePartName(part_name, format_version); !part_opt)
|
||||
throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Unexpected part name: {}", part_name);
|
||||
else
|
||||
return *part_opt;
|
||||
}
|
||||
|
||||
|
||||
void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTreeDataFormatVersion format_version)
|
||||
{
|
||||
if (partition_id.empty())
|
||||
@ -43,22 +42,26 @@ void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTr
|
||||
|
||||
}
|
||||
|
||||
bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version)
|
||||
std::optional<MergeTreePartInfo> MergeTreePartInfo::tryParsePartName(
|
||||
std::string_view part_name, MergeTreeDataFormatVersion format_version)
|
||||
{
|
||||
ReadBufferFromString in(part_name);
|
||||
|
||||
String partition_id;
|
||||
|
||||
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
UInt32 min_yyyymmdd = 0;
|
||||
UInt32 max_yyyymmdd = 0;
|
||||
|
||||
if (!tryReadIntText(min_yyyymmdd, in)
|
||||
|| !checkChar('_', in)
|
||||
|| !tryReadIntText(max_yyyymmdd, in)
|
||||
|| !checkChar('_', in))
|
||||
{
|
||||
return false;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
partition_id = toString(min_yyyymmdd / 100);
|
||||
}
|
||||
else
|
||||
@ -76,9 +79,7 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
|
||||
|
||||
/// Sanity check
|
||||
if (partition_id.empty())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return std::nullopt;
|
||||
|
||||
Int64 min_block_num = 0;
|
||||
Int64 max_block_num = 0;
|
||||
@ -91,14 +92,12 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
|
||||
|| !checkChar('_', in)
|
||||
|| !tryReadIntText(level, in))
|
||||
{
|
||||
return false;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Sanity check
|
||||
if (min_block_num > max_block_num)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return std::nullopt;
|
||||
|
||||
if (!in.eof())
|
||||
{
|
||||
@ -106,29 +105,30 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart
|
||||
|| !tryReadIntText(mutation, in)
|
||||
|| !in.eof())
|
||||
{
|
||||
return false;
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
if (part_info)
|
||||
MergeTreePartInfo part_info;
|
||||
|
||||
part_info.partition_id = std::move(partition_id);
|
||||
part_info.min_block = min_block_num;
|
||||
part_info.max_block = max_block_num;
|
||||
|
||||
if (level == LEGACY_MAX_LEVEL)
|
||||
{
|
||||
part_info->partition_id = std::move(partition_id);
|
||||
part_info->min_block = min_block_num;
|
||||
part_info->max_block = max_block_num;
|
||||
if (level == LEGACY_MAX_LEVEL)
|
||||
{
|
||||
/// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like
|
||||
/// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295".
|
||||
/// So we replace unexpected max level to make contains(...) method and comparison operators work
|
||||
/// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged.
|
||||
part_info->use_leagcy_max_level = true;
|
||||
level = MAX_LEVEL;
|
||||
}
|
||||
part_info->level = level;
|
||||
part_info->mutation = mutation;
|
||||
/// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like
|
||||
/// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295".
|
||||
/// So we replace unexpected max level to make contains(...) method and comparison operators work
|
||||
/// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged.
|
||||
part_info.use_leagcy_max_level = true;
|
||||
level = MAX_LEVEL;
|
||||
}
|
||||
|
||||
return true;
|
||||
part_info.level = level;
|
||||
part_info.mutation = mutation;
|
||||
|
||||
return part_info;
|
||||
}
|
||||
|
||||
|
||||
@ -235,55 +235,75 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
|
||||
const std::vector<String> DetachedPartInfo::DETACH_REASONS =
|
||||
{
|
||||
"broken",
|
||||
"unexpected",
|
||||
"noquorum",
|
||||
"ignored",
|
||||
"broken-on-start",
|
||||
"clone",
|
||||
"attaching",
|
||||
"deleting",
|
||||
"tmp-fetch",
|
||||
};
|
||||
|
||||
bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info,
|
||||
MergeTreeDataFormatVersion format_version)
|
||||
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
|
||||
std::string_view dir_name, MergeTreeDataFormatVersion format_version)
|
||||
{
|
||||
DetachedPartInfo part_info;
|
||||
|
||||
part_info.dir_name = dir_name;
|
||||
|
||||
/// First, try to find known prefix and parse dir_name as <prefix>_<partname>.
|
||||
/// First, try to find known prefix and parse dir_name as <prefix>_<part_name>.
|
||||
/// Arbitrary strings are not allowed for partition_id, so known_prefix cannot be confused with partition_id.
|
||||
for (const auto & known_prefix : DETACH_REASONS)
|
||||
for (std::string_view known_prefix : DETACH_REASONS)
|
||||
{
|
||||
if (dir_name.starts_with(known_prefix) && known_prefix.size() < dir_name.size() && dir_name[known_prefix.size()] == '_')
|
||||
if (dir_name.starts_with(known_prefix)
|
||||
&& known_prefix.size() < dir_name.size()
|
||||
&& dir_name[known_prefix.size()] == '_')
|
||||
{
|
||||
part_info.prefix = known_prefix;
|
||||
String part_name = dir_name.substr(known_prefix.size() + 1);
|
||||
bool parsed = MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version);
|
||||
return part_info.valid_name = parsed;
|
||||
|
||||
const std::string_view part_name = dir_name.substr(known_prefix.size() + 1);
|
||||
|
||||
if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version))
|
||||
{
|
||||
part_info.valid_name = true;
|
||||
part_info.addParsedPartInfo(*part_opt);
|
||||
}
|
||||
else
|
||||
part_info.valid_name = false;
|
||||
|
||||
return part_info;
|
||||
}
|
||||
}
|
||||
|
||||
/// Next, try to parse dir_name as <part_name>.
|
||||
if (MergeTreePartInfo::tryParsePartName(dir_name, &part_info, format_version))
|
||||
return part_info.valid_name = true;
|
||||
if (auto part_opt = MergeTreePartInfo::tryParsePartName(dir_name, format_version))
|
||||
{
|
||||
part_info.valid_name = true;
|
||||
part_info.addParsedPartInfo(*part_opt);
|
||||
return part_info;
|
||||
}
|
||||
|
||||
/// Next, as <prefix>_<partname>. Use entire name as prefix if it fails.
|
||||
part_info.prefix = dir_name;
|
||||
const auto first_separator = dir_name.find_first_of('_');
|
||||
|
||||
const size_t first_separator = dir_name.find_first_of('_');
|
||||
|
||||
if (first_separator == String::npos)
|
||||
return part_info.valid_name = false;
|
||||
{
|
||||
part_info.valid_name = false;
|
||||
return part_info;
|
||||
}
|
||||
|
||||
const auto part_name = dir_name.substr(first_separator + 1,
|
||||
dir_name.size() - first_separator - 1);
|
||||
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||
return part_info.valid_name = false;
|
||||
const std::string_view part_name = dir_name.substr(
|
||||
first_separator + 1,
|
||||
dir_name.size() - first_separator - 1);
|
||||
|
||||
part_info.prefix = dir_name.substr(0, first_separator);
|
||||
return part_info.valid_name = true;
|
||||
if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version))
|
||||
{
|
||||
part_info.valid_name = true;
|
||||
part_info.prefix = dir_name.substr(0, first_separator);
|
||||
part_info.addParsedPartInfo(*part_opt);
|
||||
}
|
||||
else
|
||||
part_info.valid_name = false;
|
||||
|
||||
return part_info;
|
||||
}
|
||||
|
||||
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part)
|
||||
{
|
||||
// Both class are aggregates so it's ok.
|
||||
static_cast<MergeTreePartInfo &>(*this) = part;
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <optional>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
#include <array>
|
||||
#include <common/types.h>
|
||||
#include <common/DayNum.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
||||
@ -98,7 +100,8 @@ struct MergeTreePartInfo
|
||||
|
||||
static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version); // -V1071
|
||||
|
||||
static bool tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version);
|
||||
static std::optional<MergeTreePartInfo> tryParsePartName(
|
||||
std::string_view part_name, MergeTreeDataFormatVersion format_version);
|
||||
|
||||
static void parseMinMaxDatesFromPartName(const String & part_name, DayNum & min_date, DayNum & max_date);
|
||||
|
||||
@ -122,11 +125,23 @@ struct DetachedPartInfo : public MergeTreePartInfo
|
||||
/// If false, MergeTreePartInfo is in invalid state (directory name was not successfully parsed).
|
||||
bool valid_name;
|
||||
|
||||
static const std::vector<String> DETACH_REASONS;
|
||||
static constexpr auto DETACH_REASONS = std::to_array<std::string_view>({
|
||||
"broken", "unexpected", "noquorum",
|
||||
"ignored", "broken-on-start", "clone",
|
||||
"attaching", "deleting", "tmp-fetch"
|
||||
});
|
||||
|
||||
/// NOTE: It may parse part info incorrectly.
|
||||
/// For example, if prefix contain '_' or if DETACH_REASONS doesn't contain prefix.
|
||||
static bool tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, MergeTreeDataFormatVersion format_version);
|
||||
/// For example, if prefix contains '_' or if DETACH_REASONS doesn't contain prefix.
|
||||
// This method has different semantics with MergeTreePartInfo::tryParsePartName as
|
||||
// the former is used for (1) verifying that a part can be parsed without using the result and (2) parsing part
|
||||
// for later usage.
|
||||
// Detached parts are always parsed regardless of their validity (valid_name is used to check that), so we do not
|
||||
// need an std::optional
|
||||
static DetachedPartInfo parseDetachedPartName(std::string_view dir_name, MergeTreeDataFormatVersion format_version);
|
||||
|
||||
private:
|
||||
void addParsedPartInfo(const MergeTreePartInfo& part);
|
||||
};
|
||||
|
||||
using DetachedPartsInfo = std::vector<DetachedPartInfo>;
|
||||
|
@ -1163,10 +1163,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
||||
const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0,
|
||||
[&](UInt64 acc, const String& part_name)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
|
||||
if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||
return acc + part_info.getBlocksCount();
|
||||
if (const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version))
|
||||
return acc + part_info->getBlocksCount();
|
||||
|
||||
LOG_ERROR(log, "Unexpected part name: {}", part_name);
|
||||
return acc;
|
||||
@ -1456,13 +1454,12 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
|
||||
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
|
||||
for (const auto it = disk->iterateDirectory(fs::path(relative_data_path) / "detached/"); it->isValid(); it->next())
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
const auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version);
|
||||
|
||||
if (!MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version) ||
|
||||
part_info.partition_id != actual_part_info.partition_id)
|
||||
if (!part_info || part_info->partition_id != actual_part_info.partition_id)
|
||||
continue;
|
||||
|
||||
const String part_old_name = part_info.getPartName();
|
||||
const String part_old_name = part_info->getPartName();
|
||||
const String part_path = fs::path("detached") / part_old_name;
|
||||
|
||||
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
|
||||
@ -6328,6 +6325,7 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
|
||||
|
||||
String partition_prefix = partition_id + "_";
|
||||
zkutil::AsyncResponses<Coordination::GetResponse> get_futures;
|
||||
|
||||
for (const String & block_id : blocks)
|
||||
{
|
||||
if (startsWith(block_id, partition_prefix))
|
||||
@ -6346,9 +6344,10 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
|
||||
continue;
|
||||
|
||||
ReadBufferFromString buf(result.data);
|
||||
MergeTreePartInfo part_info;
|
||||
bool parsed = MergeTreePartInfo::tryParsePartName(result.data, &part_info, format_version);
|
||||
if (!parsed || (min_block_num <= part_info.min_block && part_info.max_block <= max_block_num))
|
||||
|
||||
const auto part_info = MergeTreePartInfo::tryParsePartName(result.data, format_version);
|
||||
|
||||
if (!part_info || (min_block_num <= part_info->min_block && part_info->max_block <= max_block_num))
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
|
||||
}
|
||||
}
|
||||
@ -7441,12 +7440,15 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_n
|
||||
bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name)
|
||||
{
|
||||
fs::directory_iterator dir_end;
|
||||
|
||||
for (const std::string & path : getDataPaths())
|
||||
{
|
||||
for (fs::directory_iterator dir_it{fs::path(path) / "detached/"}; dir_it != dir_end; ++dir_it)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
if (MergeTreePartInfo::tryParsePartName(dir_it->path().filename(), &part_info, format_version) && part_info.partition_id == partition_name)
|
||||
const String file_name = dir_it->path().filename().string();
|
||||
auto part_info = MergeTreePartInfo::tryParsePartName(file_name, format_version);
|
||||
|
||||
if (part_info && part_info->partition_id == partition_name)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user