Proper implementation

This commit is contained in:
Alexander Sapin 2023-10-31 18:53:33 +01:00
parent f4081e3f85
commit 3a1182f6c0
8 changed files with 88 additions and 33 deletions

View File

@ -176,7 +176,8 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co
return part_info_to_name.end();
}
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> ActiveDataPartSet::getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const
{
auto it_middle = part_info_to_name.lower_bound(part_info);
auto begin = it_middle;
@ -207,13 +208,29 @@ Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info
++end;
}
Strings covered;
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> covered;
for (auto it = begin; it != end; ++it)
covered.push_back(it->second);
covered.push_back(it);
return covered;
}
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
{
Strings covered;
for (const auto & it : getPartsCoveredByImpl(part_info))
covered.push_back(it->second);
return covered;
}
std::vector<MergeTreePartInfo> ActiveDataPartSet::getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const
{
std::vector<MergeTreePartInfo> covered;
for (const auto & it : getPartsCoveredByImpl(part_info))
covered.push_back(it->first);
return covered;
}
Strings ActiveDataPartSet::getParts() const
{
Strings res;

View File

@ -91,6 +91,7 @@ public:
String getContainingPart(const String & name) const;
Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
std::vector<MergeTreePartInfo> getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const;
/// Returns parts in ascending order of the partition_id and block number.
Strings getParts() const;
@ -111,6 +112,8 @@ private:
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, String> part_info_to_name;
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const;
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
};

View File

@ -1592,7 +1592,7 @@ void MergeTreeData::loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal
}
void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartSet & parts_can_be_restored)
void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts)
{
LOG_DEBUG(log, "Loading data parts");
@ -1716,8 +1716,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS
size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0;
size_t suspicious_broken_restorable_parts = 0;
size_t suspicious_broken_restorable_parts_bytes = 0;
size_t suspicious_broken_unexpected_parts = 0;
size_t suspicious_broken_unexpected_parts_bytes = 0;
bool have_adaptive_parts = false;
bool have_non_adaptive_parts = false;
bool have_lightweight_in_parts = false;
@ -1734,16 +1734,16 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS
if (res.is_broken)
{
broken_parts_to_detach.push_back(res.part);
bool can_be_restored = !parts_can_be_restored.getContainingPart(res.part->info).empty();
if (can_be_restored)
++suspicious_broken_restorable_parts;
bool unexpected = expected_parts != std::nullopt && !expected_parts->contains(res.part->name);
if (unexpected)
++suspicious_broken_unexpected_parts;
else
++suspicious_broken_parts;
if (res.size_of_part)
{
if (can_be_restored)
++suspicious_broken_restorable_parts_bytes;
if (unexpected)
++suspicious_broken_unexpected_parts_bytes;
else
++suspicious_broken_parts_bytes;
}
@ -1784,8 +1784,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, const ActiveDataPartS
{
if (settings->strict_suspicious_broken_parts_check_on_start)
{
suspicious_broken_parts += suspicious_broken_restorable_parts;
suspicious_broken_restorable_parts_bytes += suspicious_broken_restorable_parts_bytes;
suspicious_broken_parts += suspicious_broken_unexpected_parts;
suspicious_broken_parts_bytes += suspicious_broken_unexpected_parts_bytes;
}
if (suspicious_broken_parts > settings->max_suspicious_broken_parts)

View File

@ -465,7 +465,7 @@ public:
StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks, const ActiveDataPartSet & parts_can_be_restored);
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> parts_can_be_restored);
String getLogName() const { return *std::atomic_load(&log_name); }

View File

@ -300,6 +300,33 @@ void MergeTreePartInfo::deserialize(ReadBuffer & in)
readBoolText(use_leagcy_max_level, in);
}
bool MergeTreePartInfo::areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector<MergeTreePartInfo> candidates)
{
if (candidates.empty())
return false;
std::sort(candidates.begin(), candidates.end());
/// First doesn't cover left border
if (candidates[0].min_block > blocks_range.min_block)
return false;
int64_t current_right_block = candidates[0].min_block;
for (const auto & candidate : candidates)
{
if (candidate.min_block - current_right_block > 1)
return false;
current_right_block = candidate.max_block;
}
if (current_right_block < blocks_range.max_block)
return false;
return true;
}
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version)
{
@ -367,9 +394,10 @@ DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
return part_info;
}
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part)
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo & part)
{
// Both class are aggregates so it's ok.
static_cast<MergeTreePartInfo &>(*this) = part;
}
}

View File

@ -131,6 +131,8 @@ struct MergeTreePartInfo
static bool contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version);
static bool areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector<MergeTreePartInfo> candidates);
static constexpr UInt32 MAX_LEVEL = 999999999;
static constexpr UInt32 MAX_BLOCK_NUMBER = 999999999;

View File

@ -112,8 +112,8 @@ StorageMergeTree::StorageMergeTree(
{
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
ActiveDataPartSet dummy(format_version);
loadDataParts(has_force_restore_data_flag, dummy);
loadDataParts(has_force_restore_data_flag, std::nullopt);
if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
throw Exception(ErrorCodes::INCORRECT_DATA,

View File

@ -389,6 +389,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
}
std::optional<std::unordered_set<std::string>> expected_parts_on_this_replica;
bool skip_sanity_checks = false;
/// It does not make sense for CREATE query
if (attach)
@ -417,6 +419,16 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
skip_sanity_checks = true;
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
} /// In case of force_restore it doesn't make sense to check anything
else if (current_zookeeper && current_zookeeper->exists(replica_path))
{
std::vector<std::string> parts_on_replica;
if (current_zookeeper->tryGetChildren(fs::path(replica_path) / "parts", parts_on_replica) == Coordination::Error::ZOK)
{
expected_parts_on_this_replica.emplace();
for (const auto & part : parts_on_replica)
expected_parts_on_this_replica->insert(part);
}
}
}
catch (const Coordination::Exception & e)
@ -427,22 +439,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
}
ActiveDataPartSet data_parts_on_other_replicas(format_version);
if (current_zookeeper)
{
auto replicas = current_zookeeper->getChildren(zookeeper_path);
for (const auto & child : replicas)
{
auto one_of_replicas_path = fs::path(zookeeper_path) / "replicas" / child;
if (one_of_replicas_path == replica_path)
continue;
for (const auto & part : one_of_replicas_path)
data_parts_on_other_replicas.add(part);
}
}
loadDataParts(skip_sanity_checks, data_parts_on_other_replicas);
loadDataParts(skip_sanity_checks, expected_parts_on_this_replica);
if (attach)
{
@ -1392,6 +1389,14 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
continue;
}
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(part->info);
if (MergeTreePartInfo::areAllBlockNumbersCovered(part->info, covered_parts))
{
covered_unexpected_parts.push_back(part->name);
continue;
}
/// Part is unexpected and we don't have covering part: it's suspicious
uncovered_unexpected_parts.insert(part->name);
uncovered_unexpected_parts_rows += part->rows_count;