mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge pull request #56142 from ClickHouse/make_replicated_start_more_robust
Better process broken parts on table start for replicated tables
This commit is contained in:
commit
e88fa10aa2
@ -176,7 +176,8 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co
|
|||||||
return part_info_to_name.end();
|
return part_info_to_name.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
|
|
||||||
|
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> ActiveDataPartSet::getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const
|
||||||
{
|
{
|
||||||
auto it_middle = part_info_to_name.lower_bound(part_info);
|
auto it_middle = part_info_to_name.lower_bound(part_info);
|
||||||
auto begin = it_middle;
|
auto begin = it_middle;
|
||||||
@ -207,13 +208,29 @@ Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info
|
|||||||
++end;
|
++end;
|
||||||
}
|
}
|
||||||
|
|
||||||
Strings covered;
|
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> covered;
|
||||||
for (auto it = begin; it != end; ++it)
|
for (auto it = begin; it != end; ++it)
|
||||||
covered.push_back(it->second);
|
covered.push_back(it);
|
||||||
|
|
||||||
return covered;
|
return covered;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
|
||||||
|
{
|
||||||
|
Strings covered;
|
||||||
|
for (const auto & it : getPartsCoveredByImpl(part_info))
|
||||||
|
covered.push_back(it->second);
|
||||||
|
return covered;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<MergeTreePartInfo> ActiveDataPartSet::getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const
|
||||||
|
{
|
||||||
|
std::vector<MergeTreePartInfo> covered;
|
||||||
|
for (const auto & it : getPartsCoveredByImpl(part_info))
|
||||||
|
covered.push_back(it->first);
|
||||||
|
return covered;
|
||||||
|
}
|
||||||
|
|
||||||
Strings ActiveDataPartSet::getParts() const
|
Strings ActiveDataPartSet::getParts() const
|
||||||
{
|
{
|
||||||
Strings res;
|
Strings res;
|
||||||
|
@ -91,6 +91,7 @@ public:
|
|||||||
String getContainingPart(const String & name) const;
|
String getContainingPart(const String & name) const;
|
||||||
|
|
||||||
Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
|
Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
|
||||||
|
std::vector<MergeTreePartInfo> getPartInfosCoveredBy(const MergeTreePartInfo & part_info) const;
|
||||||
|
|
||||||
/// Returns parts in ascending order of the partition_id and block number.
|
/// Returns parts in ascending order of the partition_id and block number.
|
||||||
Strings getParts() const;
|
Strings getParts() const;
|
||||||
@ -111,6 +112,8 @@ private:
|
|||||||
MergeTreeDataFormatVersion format_version;
|
MergeTreeDataFormatVersion format_version;
|
||||||
std::map<MergeTreePartInfo, String> part_info_to_name;
|
std::map<MergeTreePartInfo, String> part_info_to_name;
|
||||||
|
|
||||||
|
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const;
|
||||||
|
|
||||||
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
|
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1592,7 +1592,7 @@ void MergeTreeData::loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Loading data parts");
|
LOG_DEBUG(log, "Loading data parts");
|
||||||
|
|
||||||
@ -1716,6 +1716,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
|
|
||||||
size_t suspicious_broken_parts = 0;
|
size_t suspicious_broken_parts = 0;
|
||||||
size_t suspicious_broken_parts_bytes = 0;
|
size_t suspicious_broken_parts_bytes = 0;
|
||||||
|
size_t suspicious_broken_unexpected_parts = 0;
|
||||||
|
size_t suspicious_broken_unexpected_parts_bytes = 0;
|
||||||
bool have_adaptive_parts = false;
|
bool have_adaptive_parts = false;
|
||||||
bool have_non_adaptive_parts = false;
|
bool have_non_adaptive_parts = false;
|
||||||
bool have_lightweight_in_parts = false;
|
bool have_lightweight_in_parts = false;
|
||||||
@ -1732,10 +1734,24 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
if (res.is_broken)
|
if (res.is_broken)
|
||||||
{
|
{
|
||||||
broken_parts_to_detach.push_back(res.part);
|
broken_parts_to_detach.push_back(res.part);
|
||||||
|
bool unexpected = expected_parts != std::nullopt && !expected_parts->contains(res.part->name);
|
||||||
|
if (unexpected)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "loadDataParts: Part {} is broken, but it's not expected to be in parts set, "
|
||||||
|
" will not count it as suspicious broken part", res.part->name);
|
||||||
|
++suspicious_broken_unexpected_parts;
|
||||||
|
}
|
||||||
|
else
|
||||||
++suspicious_broken_parts;
|
++suspicious_broken_parts;
|
||||||
|
|
||||||
if (res.size_of_part)
|
if (res.size_of_part)
|
||||||
|
{
|
||||||
|
if (unexpected)
|
||||||
|
suspicious_broken_unexpected_parts_bytes += *res.size_of_part;
|
||||||
|
else
|
||||||
suspicious_broken_parts_bytes += *res.size_of_part;
|
suspicious_broken_parts_bytes += *res.size_of_part;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else if (res.part->is_duplicate)
|
else if (res.part->is_duplicate)
|
||||||
{
|
{
|
||||||
if (!is_static_storage)
|
if (!is_static_storage)
|
||||||
@ -1768,23 +1784,34 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
has_lightweight_delete_parts = have_lightweight_in_parts;
|
has_lightweight_delete_parts = have_lightweight_in_parts;
|
||||||
transactions_enabled = have_parts_with_version_metadata;
|
transactions_enabled = have_parts_with_version_metadata;
|
||||||
|
|
||||||
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
|
if (!skip_sanity_checks)
|
||||||
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
|
{
|
||||||
|
if (suspicious_broken_parts > settings->max_suspicious_broken_parts)
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
|
||||||
"Suspiciously many ({} parts, {} in total) broken parts "
|
"Suspiciously many ({} parts, {} in total) broken parts "
|
||||||
"to remove while maximum allowed broken parts count is {}. You can change the maximum value "
|
"to remove while maximum allowed broken parts count is {}. You can change the maximum value "
|
||||||
"with merge tree setting 'max_suspicious_broken_parts' "
|
"with merge tree setting 'max_suspicious_broken_parts' in <merge_tree> configuration section or in table settings in .sql file "
|
||||||
"in <merge_tree> configuration section or in table settings in .sql file "
|
|
||||||
"(don't forget to return setting back to default value)",
|
"(don't forget to return setting back to default value)",
|
||||||
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
|
suspicious_broken_parts,
|
||||||
|
formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
|
||||||
settings->max_suspicious_broken_parts);
|
settings->max_suspicious_broken_parts);
|
||||||
|
|
||||||
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks)
|
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes)
|
||||||
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
|
throw Exception(
|
||||||
"Suspiciously big size ({} parts, {} in total) of all broken parts to remove while maximum allowed broken parts size is {}. "
|
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
|
||||||
|
"Suspiciously big size ({} parts, {} in total) of all broken "
|
||||||
|
"parts to remove while maximum allowed broken parts size is {}. "
|
||||||
"You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in <merge_tree> configuration "
|
"You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in <merge_tree> configuration "
|
||||||
"section or in table settings in .sql file (don't forget to return setting back to default value)",
|
"section or in table settings in .sql file (don't forget to return setting back to default value)",
|
||||||
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
|
suspicious_broken_parts,
|
||||||
|
formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes));
|
formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (suspicious_broken_unexpected_parts != 0)
|
||||||
|
LOG_WARNING(log, "Found suspicious broken unexpected parts {} with total rows count {}", suspicious_broken_unexpected_parts, suspicious_broken_unexpected_parts_bytes);
|
||||||
|
|
||||||
|
|
||||||
if (!is_static_storage)
|
if (!is_static_storage)
|
||||||
for (auto & part : broken_parts_to_detach)
|
for (auto & part : broken_parts_to_detach)
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <Disks/StoragePolicy.h>
|
#include <Disks/StoragePolicy.h>
|
||||||
#include <Processors/Merges/Algorithms/Graphite.h>
|
#include <Processors/Merges/Algorithms/Graphite.h>
|
||||||
|
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
||||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||||
@ -464,7 +465,7 @@ public:
|
|||||||
StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
|
StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
|
||||||
|
|
||||||
/// Load the set of data parts from disk. Call once - immediately after the object is created.
|
/// Load the set of data parts from disk. Call once - immediately after the object is created.
|
||||||
void loadDataParts(bool skip_sanity_checks);
|
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts);
|
||||||
|
|
||||||
String getLogName() const { return *std::atomic_load(&log_name); }
|
String getLogName() const { return *std::atomic_load(&log_name); }
|
||||||
|
|
||||||
|
@ -300,6 +300,33 @@ void MergeTreePartInfo::deserialize(ReadBuffer & in)
|
|||||||
readBoolText(use_leagcy_max_level, in);
|
readBoolText(use_leagcy_max_level, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool MergeTreePartInfo::areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector<MergeTreePartInfo> candidates)
|
||||||
|
{
|
||||||
|
if (candidates.empty())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
std::sort(candidates.begin(), candidates.end());
|
||||||
|
|
||||||
|
/// First doesn't cover left border
|
||||||
|
if (candidates[0].min_block != blocks_range.min_block)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
int64_t current_right_block = candidates[0].min_block - 1;
|
||||||
|
|
||||||
|
for (const auto & candidate : candidates)
|
||||||
|
{
|
||||||
|
if (current_right_block + 1 != candidate.min_block)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
current_right_block = candidate.max_block;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (current_right_block != blocks_range.max_block)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
|
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
|
||||||
const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version)
|
const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version)
|
||||||
{
|
{
|
||||||
@ -367,9 +394,10 @@ DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
|
|||||||
return part_info;
|
return part_info;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo& part)
|
void DetachedPartInfo::addParsedPartInfo(const MergeTreePartInfo & part)
|
||||||
{
|
{
|
||||||
// Both class are aggregates so it's ok.
|
// Both class are aggregates so it's ok.
|
||||||
static_cast<MergeTreePartInfo &>(*this) = part;
|
static_cast<MergeTreePartInfo &>(*this) = part;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -131,6 +131,8 @@ struct MergeTreePartInfo
|
|||||||
|
|
||||||
static bool contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version);
|
static bool contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version);
|
||||||
|
|
||||||
|
static bool areAllBlockNumbersCovered(const MergeTreePartInfo & blocks_range, std::vector<MergeTreePartInfo> candidates);
|
||||||
|
|
||||||
static constexpr UInt32 MAX_LEVEL = 999999999;
|
static constexpr UInt32 MAX_LEVEL = 999999999;
|
||||||
static constexpr UInt32 MAX_BLOCK_NUMBER = 999999999;
|
static constexpr UInt32 MAX_BLOCK_NUMBER = 999999999;
|
||||||
|
|
||||||
|
@ -112,7 +112,8 @@ StorageMergeTree::StorageMergeTree(
|
|||||||
{
|
{
|
||||||
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
|
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
|
||||||
|
|
||||||
loadDataParts(has_force_restore_data_flag);
|
|
||||||
|
loadDataParts(has_force_restore_data_flag, std::nullopt);
|
||||||
|
|
||||||
if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
|
if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
|
||||||
throw Exception(ErrorCodes::INCORRECT_DATA,
|
throw Exception(ErrorCodes::INCORRECT_DATA,
|
||||||
|
@ -389,6 +389,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::optional<std::unordered_set<std::string>> expected_parts_on_this_replica;
|
||||||
bool skip_sanity_checks = false;
|
bool skip_sanity_checks = false;
|
||||||
/// It does not make sense for CREATE query
|
/// It does not make sense for CREATE query
|
||||||
if (attach)
|
if (attach)
|
||||||
@ -417,6 +419,16 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
skip_sanity_checks = true;
|
skip_sanity_checks = true;
|
||||||
|
|
||||||
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
|
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
|
||||||
|
} /// In case of force_restore it doesn't make sense to check anything
|
||||||
|
else if (current_zookeeper && current_zookeeper->exists(replica_path))
|
||||||
|
{
|
||||||
|
std::vector<std::string> parts_on_replica;
|
||||||
|
if (current_zookeeper->tryGetChildren(fs::path(replica_path) / "parts", parts_on_replica) == Coordination::Error::ZOK)
|
||||||
|
{
|
||||||
|
expected_parts_on_this_replica.emplace();
|
||||||
|
for (const auto & part : parts_on_replica)
|
||||||
|
expected_parts_on_this_replica->insert(part);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const Coordination::Exception & e)
|
catch (const Coordination::Exception & e)
|
||||||
@ -427,7 +439,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loadDataParts(skip_sanity_checks);
|
loadDataParts(skip_sanity_checks, expected_parts_on_this_replica);
|
||||||
|
|
||||||
if (attach)
|
if (attach)
|
||||||
{
|
{
|
||||||
@ -1362,6 +1374,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
|
|
||||||
Strings covered_unexpected_parts;
|
Strings covered_unexpected_parts;
|
||||||
std::unordered_set<String> uncovered_unexpected_parts;
|
std::unordered_set<String> uncovered_unexpected_parts;
|
||||||
|
std::unordered_set<String> restorable_unexpected_parts;
|
||||||
UInt64 uncovered_unexpected_parts_rows = 0;
|
UInt64 uncovered_unexpected_parts_rows = 0;
|
||||||
|
|
||||||
for (const auto & part : unexpected_parts)
|
for (const auto & part : unexpected_parts)
|
||||||
@ -1377,6 +1390,14 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(part->info);
|
||||||
|
|
||||||
|
if (MergeTreePartInfo::areAllBlockNumbersCovered(part->info, covered_parts))
|
||||||
|
{
|
||||||
|
restorable_unexpected_parts.insert(part->name);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/// Part is unexpected and we don't have covering part: it's suspicious
|
/// Part is unexpected and we don't have covering part: it's suspicious
|
||||||
uncovered_unexpected_parts.insert(part->name);
|
uncovered_unexpected_parts.insert(part->name);
|
||||||
uncovered_unexpected_parts_rows += part->rows_count;
|
uncovered_unexpected_parts_rows += part->rows_count;
|
||||||
@ -1419,11 +1440,11 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
|
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
|
||||||
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
|
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
|
||||||
|
|
||||||
constexpr auto sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
|
constexpr auto sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Restorable unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
|
||||||
|
|
||||||
if (insane && !skip_sanity_checks)
|
if (insane && !skip_sanity_checks)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
|
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(restorable_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
|
||||||
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
|
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
|
||||||
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, sanity_report_fmt, getStorageID().getNameForLogs(),
|
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, sanity_report_fmt, getStorageID().getNameForLogs(),
|
||||||
formatReadableQuantity(uncovered_unexpected_parts_rows),
|
formatReadableQuantity(uncovered_unexpected_parts_rows),
|
||||||
@ -1435,7 +1456,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
|
|
||||||
if (unexpected_parts_nonnew_rows > 0 || uncovered_unexpected_parts_rows > 0)
|
if (unexpected_parts_nonnew_rows > 0 || uncovered_unexpected_parts_rows > 0)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
|
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(restorable_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
|
||||||
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
|
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
|
||||||
LOG_WARNING(log, sanity_report_fmt, getStorageID().getNameForLogs(),
|
LOG_WARNING(log, sanity_report_fmt, getStorageID().getNameForLogs(),
|
||||||
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
|
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
|
||||||
@ -1454,7 +1475,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
/// Remove extra local parts.
|
/// Remove extra local parts.
|
||||||
for (const DataPartPtr & part : unexpected_parts)
|
for (const DataPartPtr & part : unexpected_parts)
|
||||||
{
|
{
|
||||||
bool restore_covered = uncovered_unexpected_parts.contains(part->name);
|
bool restore_covered = restorable_unexpected_parts.contains(part->name) || uncovered_unexpected_parts.contains(part->name);
|
||||||
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}{}", part->name, part->name, restore_covered ? ", restoring covered parts" : "");
|
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}{}", part->name, part->name, restore_covered ? ", restoring covered parts" : "");
|
||||||
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", restore_covered);
|
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", restore_covered);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
#!/usr/bin/env python3
|
@ -0,0 +1,187 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
# pylint: disable=line-too-long
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import os
|
||||||
|
from helpers.client import QueryRuntimeException
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
node = cluster.add_instance("node", stay_alive=True, with_zookeeper=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module", autouse=True)
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def break_part(table, part_name):
|
||||||
|
node.exec_in_container(
|
||||||
|
[
|
||||||
|
"bash",
|
||||||
|
"-c",
|
||||||
|
f"rm /var/lib/clickhouse/data/default/{table}/{part_name}/columns.txt",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def remove_part(table, part_name):
|
||||||
|
node.exec_in_container(
|
||||||
|
["bash", "-c", f"rm -r /var/lib/clickhouse/data/default/{table}/{part_name}"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_count(table):
|
||||||
|
return int(node.query(f"SELECT count() FROM {table}").strip())
|
||||||
|
|
||||||
|
|
||||||
|
def detach_table(table):
|
||||||
|
node.query(f"DETACH TABLE {table}")
|
||||||
|
|
||||||
|
|
||||||
|
def attach_table(table):
|
||||||
|
node.query(f"ATTACH TABLE {table}")
|
||||||
|
|
||||||
|
|
||||||
|
def remove_part_from_zookeeper(replica_path, part_name):
|
||||||
|
zk = cluster.get_kazoo_client("zoo1")
|
||||||
|
zk.delete(os.path.join(replica_path, f"parts/{part_name}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_unexpected_uncommitted_merge():
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE broken_table (key Int) ENGINE = ReplicatedMergeTree('/tables/broken', '1') ORDER BY tuple()
|
||||||
|
SETTINGS max_suspicious_broken_parts = 0, replicated_max_ratio_of_wrong_parts=0"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO broken_table SELECT number from numbers(10)")
|
||||||
|
node.query("INSERT INTO broken_table SELECT number from numbers(10, 10)")
|
||||||
|
|
||||||
|
node.query("OPTIMIZE TABLE broken_table FINAL")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table' and active"
|
||||||
|
)
|
||||||
|
== "all_0_1_1\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
remove_part_from_zookeeper("/tables/broken/replicas/1", "all_0_1_1")
|
||||||
|
|
||||||
|
detach_table("broken_table")
|
||||||
|
attach_table("broken_table")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table' and active order by name"
|
||||||
|
)
|
||||||
|
== "all_0_0_0\nall_1_1_0\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_corrupted_random_part():
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE broken_table_1 (key Int) ENGINE = ReplicatedMergeTree('/tables/broken_1', '1') ORDER BY tuple()
|
||||||
|
SETTINGS max_suspicious_broken_parts = 0, replicated_max_ratio_of_wrong_parts=0"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO broken_table_1 SELECT number from numbers(10)")
|
||||||
|
node.query("INSERT INTO broken_table_1 SELECT number from numbers(10, 10)")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table_1") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table_1' and active order by name"
|
||||||
|
)
|
||||||
|
== "all_0_0_0\nall_1_1_0\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
break_part("broken_table_1", "all_0_0_0")
|
||||||
|
|
||||||
|
detach_table("broken_table_1")
|
||||||
|
with pytest.raises(QueryRuntimeException):
|
||||||
|
attach_table("broken_table_1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_corrupted_unexpected_part():
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE broken_table_2 (key Int) ENGINE = ReplicatedMergeTree('/tables/broken_2', '1') ORDER BY tuple()
|
||||||
|
SETTINGS max_suspicious_broken_parts = 0, replicated_max_ratio_of_wrong_parts=0"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO broken_table_2 SELECT number from numbers(10)")
|
||||||
|
node.query("INSERT INTO broken_table_2 SELECT number from numbers(10, 10)")
|
||||||
|
|
||||||
|
node.query("OPTIMIZE TABLE broken_table_2 FINAL")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table_2") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table_2' and active"
|
||||||
|
)
|
||||||
|
== "all_0_1_1\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
remove_part_from_zookeeper("/tables/broken_2/replicas/1", "all_0_0_0")
|
||||||
|
break_part("broken_table_2", "all_0_0_0")
|
||||||
|
|
||||||
|
detach_table("broken_table_2")
|
||||||
|
attach_table("broken_table_2")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table_2") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table_2' and active"
|
||||||
|
)
|
||||||
|
== "all_0_1_1\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_corrupted_unexpected_part_ultimate():
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE broken_table_3 (key Int) ENGINE = ReplicatedMergeTree('/tables/broken_3', '1') ORDER BY tuple()
|
||||||
|
SETTINGS max_suspicious_broken_parts = 0, replicated_max_ratio_of_wrong_parts=0"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("INSERT INTO broken_table_3 SELECT number from numbers(10)")
|
||||||
|
node.query("INSERT INTO broken_table_3 SELECT number from numbers(10, 10)")
|
||||||
|
|
||||||
|
node.query("OPTIMIZE TABLE broken_table_3 FINAL")
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table_3") == "190\n"
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT name FROM system.parts where table = 'broken_table_3' and active"
|
||||||
|
)
|
||||||
|
== "all_0_1_1\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
remove_part_from_zookeeper("/tables/broken_3/replicas/1", "all_0_0_0")
|
||||||
|
break_part("broken_table_3", "all_0_0_0")
|
||||||
|
remove_part_from_zookeeper("/tables/broken_3/replicas/1", "all_0_1_1")
|
||||||
|
|
||||||
|
detach_table("broken_table_3")
|
||||||
|
attach_table("broken_table_3")
|
||||||
|
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT is_readonly FROM system.replicas WHERE table = 'broken_table_3'"
|
||||||
|
)
|
||||||
|
== "1\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert node.query("SELECT sum(key) FROM broken_table_3") == "190\n"
|
Loading…
Reference in New Issue
Block a user