diff --git a/src/Storages/MergeTree/ActiveDataPartSet.cpp b/src/Storages/MergeTree/ActiveDataPartSet.cpp index a1746cc1746..89e01008945 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -176,7 +176,8 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co return part_info_to_name.end(); } -Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const + +std::vector::const_iterator> ActiveDataPartSet::getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const { auto it_middle = part_info_to_name.lower_bound(part_info); auto begin = it_middle; @@ -207,13 +208,29 @@ Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info ++end; } - Strings covered; + std::vector::const_iterator> covered; for (auto it = begin; it != end; ++it) - covered.push_back(it->second); + covered.push_back(it); return covered; } +Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const +{ + Strings covered; + for (const auto & it : getPartsCoveredByImpl(part_info)) + covered.push_back(it->second); + return covered; +} + +std::vector ActiveDataPartSet::getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const +{ + std::vector covered; + for (const auto & it : getPartsCoveredByImpl(part_info)) + covered.push_back(it->first); + return covered; +} + Strings ActiveDataPartSet::getParts() const { Strings res; diff --git a/src/Storages/MergeTree/ActiveDataPartSet.h b/src/Storages/MergeTree/ActiveDataPartSet.h index 662309f64cf..98a06e02ae8 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/src/Storages/MergeTree/ActiveDataPartSet.h @@ -91,6 +91,7 @@ public: String getContainingPart(const String & name) const; Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const; + std::vector getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const; /// Returns parts in ascending order of the partition_id and block number. Strings getParts() const; @@ -111,6 +112,8 @@ private: MergeTreeDataFormatVersion format_version; std::map part_info_to_name; + std::vector::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const; + std::map::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const; }; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 49a70fab263..55ce3f4476d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1592,7 +1592,7 @@ void MergeTreeData::loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal } -void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartSet & parts_can_be_restored) +void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional> expected_parts) { LOG_DEBUG(log, "Loading data parts"); @@ -1716,8 +1716,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS size_t suspicious_broken_parts = 0; size_t suspicious_broken_parts_bytes = 0; - size_t suspicious_broken_restorable_parts = 0; - size_t suspicious_broken_restorable_parts_bytes = 0; + size_t suspicious_broken_unexpected_parts = 0; + size_t suspicious_broken_unexpected_parts_bytes = 0; bool have_adaptive_parts = false; bool have_non_adaptive_parts = false; bool have_lightweight_in_parts = false; @@ -1734,16 +1734,16 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS if (res.is_broken) { broken_parts_to_detach.push_back(res.part); - bool can_be_restored = !parts_can_be_restored.getContainingPart(res.part->info).empty(); - if (can_be_restored) - ++suspicious_broken_restorable_parts; + bool unexpected = expected_parts != std::nullopt && !expected_parts->contains(res.part->name); + if (unexpected) + ++suspicious_broken_unexpected_parts; else ++suspicious_broken_parts; if (res.size_of_part) { - if (can_be_restored) - ++suspicious_broken_restorable_parts_bytes; + if (unexpected) + ++suspicious_broken_unexpected_parts_bytes; else ++suspicious_broken_parts_bytes; } @@ -1784,8 +1784,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS { if (settings->strict_suspicious_broken_parts_check_on_start) { - suspicious_broken_parts += suspicious_broken_restorable_parts; - suspicious_broken_restorable_parts_bytes += suspicious_broken_restorable_parts_bytes; + suspicious_broken_parts += suspicious_broken_unexpected_parts; + suspicious_broken_parts_bytes += suspicious_broken_unexpected_parts_bytes; } if (suspicious_broken_parts > settings->max_suspicious_broken_parts) diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index bfc5d7ba25b..24ae9825325 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -465,7 +465,7 @@ public: StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; /// Load the set of data parts from disk. Call once - immediately after the object is created. - void loadDataParts(bool skip_sanity_checks, const ActiveDataPartSet & parts_can_be_restored); + void loadDataParts(bool skip_sanity_checks, std::optional> parts_can_be_restored); String getLogName() const { return *std::atomic_load(&log_name); } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.cpp b/src/Storages/MergeTree/MergeTreePartInfo.cpp index 16c119c642a..9246ebba470 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.cpp +++ b/src/Storages/MergeTree/MergeTreePartInfo.cpp @@ -300,6 +300,33 @@ void MergeTreePartInfo::deserialize(ReadBuffer & in) readBoolText(use_leagcy_max_level, in); } +bool MergeTreePartInfo::areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector candidates) +{ + if (candidates.empty()) + return false; + + std::sort(candidates.begin(), candidates.end()); + + /// First doesn't cover left border + if (candidates[0].min_block > blocks_range.min_block) + return false; + + int64_t current_right_block = candidates[0].min_block; + + for (const auto & candidate : candidates) + { + if (candidate.min_block - current_right_block > 1) + return false; + + current_right_block = candidate.max_block; + } + + if (current_right_block < blocks_range.max_block) + return false; + + return true; +} + DetachedPartInfo DetachedPartInfo::parseDetachedPartName( const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version) { @@ -367,9 +394,10 @@ DetachedPartInfo DetachedPartInfo::parseDetachedPartName( return part_info; } -void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part) +void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo & part) { // Both class are aggregates so it's ok. static_cast(*this) = part; } + } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index d359e23901f..5fbb5d70bf3 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -131,6 +131,8 @@ struct MergeTreePartInfo static bool contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version); + static bool areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector candidates); + static constexpr UInt32 MAX_LEVEL = 999999999; static constexpr UInt32 MAX_BLOCK_NUMBER = 999999999; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 908ea96612c..804f65fbe22 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -112,8 +112,8 @@ StorageMergeTree::StorageMergeTree( { initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name); - ActiveDataPartSet dummy(format_version); - loadDataParts(has_force_restore_data_flag, dummy); + + loadDataParts(has_force_restore_data_flag, std::nullopt); if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage()) throw Exception(ErrorCodes::INCORRECT_DATA, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index bd7001bcb11..14aaafeff16 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -389,6 +389,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } } + + std::optional> expected_parts_on_this_replica; bool skip_sanity_checks = false; /// It does not make sense for CREATE query if (attach) @@ -417,6 +419,16 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( skip_sanity_checks = true; LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data)."); + } /// In case of force_restore it doesn't make sense to check anything + else if (current_zookeeper && current_zookeeper->exists(replica_path)) + { + std::vector parts_on_replica; + if (current_zookeeper->tryGetChildren(fs::path(replica_path) / "parts", parts_on_replica) == Coordination::Error::ZOK) + { + expected_parts_on_this_replica.emplace(); + for (const auto & part : parts_on_replica) + expected_parts_on_this_replica->insert(part); + } } } catch (const Coordination::Exception & e) @@ -427,22 +439,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } } - ActiveDataPartSet data_parts_on_other_replicas(format_version); - if (current_zookeeper) - { - auto replicas = current_zookeeper->getChildren(zookeeper_path); - for (const auto & child : replicas) - { - auto one_of_replicas_path = fs::path(zookeeper_path) / "replicas" / child; - if (one_of_replicas_path == replica_path) - continue; - - for (const auto & part : one_of_replicas_path) - data_parts_on_other_replicas.add(part); - } - } - - loadDataParts(skip_sanity_checks, data_parts_on_other_replicas); + loadDataParts(skip_sanity_checks, expected_parts_on_this_replica); if (attach) { @@ -1392,6 +1389,14 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) continue; } + auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(part->info); + + if (MergeTreePartInfo::areAllBlockNumbersCovered(part->info, covered_parts)) + { + covered_unexpected_parts.push_back(part->name); + continue; + } + /// Part is unexpected and we don't have covering part: it's suspicious uncovered_unexpected_parts.insert(part->name); uncovered_unexpected_parts_rows += part->rows_count;