diff --git a/src/IO/ReadBufferFromString.h b/src/IO/ReadBufferFromString.h index fac1b230a2c..1f8319d9a33 100644 --- a/src/IO/ReadBufferFromString.h +++ b/src/IO/ReadBufferFromString.h @@ -2,18 +2,17 @@ #include - namespace DB { -/** Allows to read from std::string-like object. - */ +/// Allows to read from std::string-like object. class ReadBufferFromString : public ReadBufferFromMemory { public: /// std::string or something similar template ReadBufferFromString(const S & s) : ReadBufferFromMemory(s.data(), s.size()) {} -}; + explicit ReadBufferFromString(std::string_view s) : ReadBufferFromMemory(s.data(), s.size()) {} +}; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 764f5d7adf7..d461d55d161 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -884,6 +884,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { /// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks. std::unordered_set defined_disk_names; + for (const auto & disk_ptr : disks) defined_disk_names.insert(disk_ptr->getName()); @@ -893,9 +894,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) { - MergeTreePartInfo part_info; - if (MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version)) - throw Exception("Part " + backQuote(it->name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK); + if (MergeTreePartInfo::tryParsePartName(it->name(), format_version)) + throw Exception(ErrorCodes::UNKNOWN_DISK, + "Part {} was found on disk {} which is not defined in the storage policy", + backQuote(it->name()), backQuote(disk_name)); } } } @@ -906,10 +908,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it) { auto disk_ptr = *disk_it; + for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) { /// Skip temporary directories, file 'format_version.txt' and directory 'detached'. - if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME || it->name() == MergeTreeData::DETACHED_DIR_NAME) + if (startsWith(it->name(), "tmp") + || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME + || it->name() == MergeTreeData::DETACHED_DIR_NAME) continue; if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) @@ -956,25 +961,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { pool.scheduleOrThrowOnError([&] { - const auto & part_name = part_names_with_disk.first; - const auto part_disk_ptr = part_names_with_disk.second; + const auto & [part_name, part_disk_ptr] = part_names_with_disk; - MergeTreePartInfo part_info; - if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) + auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version); + + if (!part_opt) return; auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); - auto part = createPart(part_name, part_info, single_disk_volume, part_name); + auto part = createPart(part_name, *part_opt, single_disk_volume, part_name); bool broken = false; String part_path = fs::path(relative_data_path) / part_name; String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; + if (part_disk_ptr->exists(marker_path)) { - LOG_WARNING(log, "Detaching stale part {}{}, which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", getFullPathOnDisk(part_disk_ptr), part_name); + LOG_WARNING(log, + "Detaching stale part {}{}, which should have been deleted after a move. That can only happen " + "after unclean restart of ClickHouse after move of a part having an operation blocking that " + "stale copy of part.", + getFullPathOnDisk(part_disk_ptr), part_name); + std::lock_guard loading_lock(mutex); + broken_parts_to_detach.push_back(part); + ++suspicious_broken_parts; + return; } @@ -1002,25 +1016,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// Ignore broken parts that can appear as a result of hard server restart. if (broken) { - LOG_ERROR(log, "Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually", getFullPathOnDisk(part_disk_ptr), part_name); + LOG_ERROR(log, + "Detaching broken part {}{}. If it happened after update, it is likely because of backward " + "incompatibility. You need to resolve this manually", + getFullPathOnDisk(part_disk_ptr), part_name); + std::lock_guard loading_lock(mutex); + broken_parts_to_detach.push_back(part); + ++suspicious_broken_parts; return; } + if (!part->index_granularity_info.is_adaptive) has_non_adaptive_parts.store(true, std::memory_order_relaxed); else has_adaptive_parts.store(true, std::memory_order_relaxed); part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); + /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later part->setState(DataPartState::Committed); std::lock_guard loading_lock(mutex); + if (!data_parts_indexes.insert(part).second) - throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name); addPartContributionToDataVolume(part); }); @@ -3292,11 +3315,12 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac Strings part_names = backup->list(data_path_in_backup); for (const String & part_name : part_names) { - MergeTreePartInfo part_info; - if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) + const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version); + + if (!part_info) continue; - if (!partition_ids.empty() && !partition_ids.contains(part_info.partition_id)) + if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id)) continue; UInt64 total_size_of_part = 0; @@ -3333,7 +3357,7 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac } auto single_disk_volume = std::make_shared(disk->getName(), disk, 0); - auto part = createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir); + auto part = createPart(part_name, *part_info, single_disk_volume, relative_temp_part_dir); part->loadColumnsChecksumsIndexes(false, true); renameTempPartAndAdd(part, increment); }; @@ -3545,11 +3569,8 @@ std::vector MergeTreeData::getDetachedParts() const { for (auto it = disk->iterateDirectory(detached_path); it->isValid(); it->next()) { - res.emplace_back(); - auto & part = res.back(); - - DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version); - part.disk = disk->getName(); + auto res_it = res.emplace_back(DetachedPartInfo::parseDetachedPartName(it->name(), format_version)); + res_it.disk = disk->getName(); } } } @@ -3620,7 +3641,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const validateDetachedPartName(part_id); renamed_parts.addPart(part_id, "attaching_" + part_id); - if (MergeTreePartInfo::tryParsePartName(part_id, nullptr, format_version)) + if (MergeTreePartInfo::tryParsePartName(part_id, format_version)) name_to_disk[part_id] = getDiskForPart(part_id, source_dir); } else @@ -3636,12 +3657,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next()) { const String & name = it->name(); - MergeTreePartInfo part_info; // TODO what if name contains "_tryN" suffix? /// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored - if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version) - || part_info.partition_id != partition_id) + if (auto part_opt = MergeTreePartInfo::tryParsePartName(name, format_version); + !part_opt || part_opt->partition_id != partition_id) { continue; } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.cpp b/src/Storages/MergeTree/MergeTreePartInfo.cpp index 6a98e666c34..39337fbd4c3 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.cpp +++ b/src/Storages/MergeTree/MergeTreePartInfo.cpp @@ -15,13 +15,12 @@ namespace ErrorCodes MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version) { - MergeTreePartInfo part_info; - if (!tryParsePartName(part_name, &part_info, format_version)) - throw Exception("Unexpected part name: " + part_name, ErrorCodes::BAD_DATA_PART_NAME); - return part_info; + if (auto part_opt = tryParsePartName(part_name, format_version); !part_opt) + throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Unexpected part name: {}", part_name); + else + return *part_opt; } - void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTreeDataFormatVersion format_version) { if (partition_id.empty()) @@ -43,22 +42,26 @@ void MergeTreePartInfo::validatePartitionID(const String & partition_id, MergeTr } -bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version) +std::optional MergeTreePartInfo::tryParsePartName( + std::string_view part_name, MergeTreeDataFormatVersion format_version) { ReadBufferFromString in(part_name); String partition_id; + if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { UInt32 min_yyyymmdd = 0; UInt32 max_yyyymmdd = 0; + if (!tryReadIntText(min_yyyymmdd, in) || !checkChar('_', in) || !tryReadIntText(max_yyyymmdd, in) || !checkChar('_', in)) { - return false; + return std::nullopt; } + partition_id = toString(min_yyyymmdd / 100); } else @@ -76,9 +79,7 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart /// Sanity check if (partition_id.empty()) - { - return false; - } + return std::nullopt; Int64 min_block_num = 0; Int64 max_block_num = 0; @@ -91,14 +92,12 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart || !checkChar('_', in) || !tryReadIntText(level, in)) { - return false; + return std::nullopt; } /// Sanity check if (min_block_num > max_block_num) - { - return false; - } + return std::nullopt; if (!in.eof()) { @@ -106,29 +105,30 @@ bool MergeTreePartInfo::tryParsePartName(const String & part_name, MergeTreePart || !tryReadIntText(mutation, in) || !in.eof()) { - return false; + return std::nullopt; } } - if (part_info) + MergeTreePartInfo part_info; + + part_info.partition_id = std::move(partition_id); + part_info.min_block = min_block_num; + part_info.max_block = max_block_num; + + if (level == LEGACY_MAX_LEVEL) { - part_info->partition_id = std::move(partition_id); - part_info->min_block = min_block_num; - part_info->max_block = max_block_num; - if (level == LEGACY_MAX_LEVEL) - { - /// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like - /// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295". - /// So we replace unexpected max level to make contains(...) method and comparison operators work - /// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged. - part_info->use_leagcy_max_level = true; - level = MAX_LEVEL; - } - part_info->level = level; - part_info->mutation = mutation; + /// We (accidentally) had two different max levels until 21.6 and it might cause logical errors like + /// "Part 20170601_20170630_0_2_999999999 intersects 201706_0_1_4294967295". + /// So we replace unexpected max level to make contains(...) method and comparison operators work + /// correctly with such virtual parts. On part name serialization we will use legacy max level to keep the name unchanged. + part_info.use_leagcy_max_level = true; + level = MAX_LEVEL; } - return true; + part_info.level = level; + part_info.mutation = mutation; + + return part_info; } @@ -235,55 +235,75 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con return wb.str(); } - -const std::vector DetachedPartInfo::DETACH_REASONS = - { - "broken", - "unexpected", - "noquorum", - "ignored", - "broken-on-start", - "clone", - "attaching", - "deleting", - "tmp-fetch", - }; - -bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, - MergeTreeDataFormatVersion format_version) +DetachedPartInfo DetachedPartInfo::parseDetachedPartName( + std::string_view dir_name, MergeTreeDataFormatVersion format_version) { + DetachedPartInfo part_info; + part_info.dir_name = dir_name; - /// First, try to find known prefix and parse dir_name as _. + /// First, try to find known prefix and parse dir_name as _. /// Arbitrary strings are not allowed for partition_id, so known_prefix cannot be confused with partition_id. - for (const auto & known_prefix : DETACH_REASONS) + for (std::string_view known_prefix : DETACH_REASONS) { - if (dir_name.starts_with(known_prefix) && known_prefix.size() < dir_name.size() && dir_name[known_prefix.size()] == '_') + if (dir_name.starts_with(known_prefix) + && known_prefix.size() < dir_name.size() + && dir_name[known_prefix.size()] == '_') { part_info.prefix = known_prefix; - String part_name = dir_name.substr(known_prefix.size() + 1); - bool parsed = MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version); - return part_info.valid_name = parsed; + + const std::string_view part_name = dir_name.substr(known_prefix.size() + 1); + + if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version)) + { + part_info.valid_name = true; + part_info.addParsedPartInfo(*part_opt); + } + else + part_info.valid_name = false; + + return part_info; } } /// Next, try to parse dir_name as . - if (MergeTreePartInfo::tryParsePartName(dir_name, &part_info, format_version)) - return part_info.valid_name = true; + if (auto part_opt = MergeTreePartInfo::tryParsePartName(dir_name, format_version)) + { + part_info.valid_name = true; + part_info.addParsedPartInfo(*part_opt); + return part_info; + } /// Next, as _. Use entire name as prefix if it fails. part_info.prefix = dir_name; - const auto first_separator = dir_name.find_first_of('_'); + + const size_t first_separator = dir_name.find_first_of('_'); + if (first_separator == String::npos) - return part_info.valid_name = false; + { + part_info.valid_name = false; + return part_info; + } - const auto part_name = dir_name.substr(first_separator + 1, - dir_name.size() - first_separator - 1); - if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) - return part_info.valid_name = false; + const std::string_view part_name = dir_name.substr( + first_separator + 1, + dir_name.size() - first_separator - 1); - part_info.prefix = dir_name.substr(0, first_separator); - return part_info.valid_name = true; + if (auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version)) + { + part_info.valid_name = true; + part_info.prefix = dir_name.substr(0, first_separator); + part_info.addParsedPartInfo(*part_opt); + } + else + part_info.valid_name = false; + + return part_info; } +void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part) +{ + // Both class are aggregates so it's ok. + static_cast(*this) = part; +} } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index 181fef7990c..58707b4dca3 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -1,8 +1,10 @@ #pragma once #include +#include #include #include +#include #include #include #include @@ -98,7 +100,8 @@ struct MergeTreePartInfo static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version); // -V1071 - static bool tryParsePartName(const String & part_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version); + static std::optional tryParsePartName( + std::string_view part_name, MergeTreeDataFormatVersion format_version); static void parseMinMaxDatesFromPartName(const String & part_name, DayNum & min_date, DayNum & max_date); @@ -122,11 +125,23 @@ struct DetachedPartInfo : public MergeTreePartInfo /// If false, MergeTreePartInfo is in invalid state (directory name was not successfully parsed). bool valid_name; - static const std::vector DETACH_REASONS; + static constexpr auto DETACH_REASONS = std::to_array({ + "broken", "unexpected", "noquorum", + "ignored", "broken-on-start", "clone", + "attaching", "deleting", "tmp-fetch" + }); /// NOTE: It may parse part info incorrectly. - /// For example, if prefix contain '_' or if DETACH_REASONS doesn't contain prefix. - static bool tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, MergeTreeDataFormatVersion format_version); + /// For example, if prefix contains '_' or if DETACH_REASONS doesn't contain prefix. + // This method has different semantics with MergeTreePartInfo::tryParsePartName as + // the former is used for (1) verifying that a part can be parsed without using the result and (2) parsing part + // for later usage. + // Detached parts are always parsed regardless of their validity (valid_name is used to check that), so we do not + // need an std::optional + static DetachedPartInfo parseDetachedPartName(std::string_view dir_name, MergeTreeDataFormatVersion format_version); + +private: + void addParsedPartInfo(const MergeTreePartInfo& part); }; using DetachedPartsInfo = std::vector; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3bdba5d1b35..27abc894354 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1163,10 +1163,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0, [&](UInt64 acc, const String& part_name) { - MergeTreePartInfo part_info; - - if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) - return acc + part_info.getBlocksCount(); + if (const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version)) + return acc + part_info->getBlocksCount(); LOG_ERROR(log, "Unexpected part name: {}", part_name); return acc; @@ -1456,13 +1454,12 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo for (const DiskPtr & disk : getStoragePolicy()->getDisks()) for (const auto it = disk->iterateDirectory(fs::path(relative_data_path) / "detached/"); it->isValid(); it->next()) { - MergeTreePartInfo part_info; + const auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version); - if (!MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version) || - part_info.partition_id != actual_part_info.partition_id) + if (!part_info || part_info->partition_id != actual_part_info.partition_id) continue; - const String part_old_name = part_info.getPartName(); + const String part_old_name = part_info->getPartName(); const String part_path = fs::path("detached") / part_old_name; const VolumePtr volume = std::make_shared("volume_" + part_old_name, disk); @@ -6328,6 +6325,7 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps( String partition_prefix = partition_id + "_"; zkutil::AsyncResponses get_futures; + for (const String & block_id : blocks) { if (startsWith(block_id, partition_prefix)) @@ -6346,9 +6344,10 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps( continue; ReadBufferFromString buf(result.data); - MergeTreePartInfo part_info; - bool parsed = MergeTreePartInfo::tryParsePartName(result.data, &part_info, format_version); - if (!parsed || (min_block_num <= part_info.min_block && part_info.max_block <= max_block_num)) + + const auto part_info = MergeTreePartInfo::tryParsePartName(result.data, format_version); + + if (!part_info || (min_block_num <= part_info->min_block && part_info->max_block <= max_block_num)) ops.emplace_back(zkutil::makeRemoveRequest(path, -1)); } } @@ -7441,12 +7440,15 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_n bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name) { fs::directory_iterator dir_end; + for (const std::string & path : getDataPaths()) { for (fs::directory_iterator dir_it{fs::path(path) / "detached/"}; dir_it != dir_end; ++dir_it) { - MergeTreePartInfo part_info; - if (MergeTreePartInfo::tryParsePartName(dir_it->path().filename(), &part_info, format_version) && part_info.partition_id == partition_name) + const String file_name = dir_it->path().filename().string(); + auto part_info = MergeTreePartInfo::tryParsePartName(file_name, format_version); + + if (part_info && part_info->partition_id == partition_name) return true; } }