Merge pull request #55645 from ClickHouse/restore_top_level_parts

Restore only top-level parts instead of unexpected
This commit is contained in:
alesapin 2023-10-17 15:45:21 +02:00 committed by GitHub
commit abe6df6fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 219 additions and 79 deletions

View File

@ -21,18 +21,46 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_,
add(name);
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
ActiveDataPartSet::AddPartOutcome ActiveDataPartSet::tryAddPart(const MergeTreePartInfo & part_info, String * out_reason)
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
return add(part_info, name, out_replaced_parts);
return addImpl(part_info, part_info.getPartNameAndCheckFormat(format_version), nullptr, out_reason);
}
bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts)
{
String out_reason;
AddPartOutcome outcome = addImpl(part_info, name, out_replaced_parts, &out_reason);
if (outcome == AddPartOutcome::HasIntersectingPart)
{
chassert(!out_reason.empty());
throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason));
}
return outcome == AddPartOutcome::Added;
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
String out_reason;
AddPartOutcome outcome = addImpl(part_info, name, out_replaced_parts, &out_reason);
if (outcome == AddPartOutcome::HasIntersectingPart)
{
chassert(!out_reason.empty());
throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason));
}
return outcome == AddPartOutcome::Added;
}
ActiveDataPartSet::AddPartOutcome ActiveDataPartSet::addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts, String * out_reason)
{
/// TODO make it exception safe (out_replaced_parts->push_back(...) may throw)
if (getContainingPartImpl(part_info) != part_info_to_name.end())
return false;
return AddPartOutcome::HasCovering;
/// Parts contained in `part` are located contiguously in `part_info_to_name`, overlapping with the place where the part itself would be inserted.
auto it = part_info_to_name.lower_bound(part_info);
@ -47,10 +75,15 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects previous part {}. "
"It is a bug or a result of manual intervention in the ZooKeeper data.",
part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
{
if (out_reason != nullptr)
*out_reason = fmt::format(
"Part {} intersects previous part {}. "
"It is a bug or a result of manual intervention in the ZooKeeper data.",
part_info.getPartNameForLogs(),
it->first.getPartNameForLogs());
return AddPartOutcome::HasIntersectingPart;
}
++it;
break;
}
@ -73,18 +106,33 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects part {}. It is a bug or a result of manual intervention "
"in the ZooKeeper data.", name, it->first.getPartNameForLogs());
{
if (out_reason != nullptr)
*out_reason = fmt::format(
"Part {} intersects part {}. It is a bug or a result of manual intervention "
"in the ZooKeeper data.",
name,
it->first.getPartNameForLogs());
return AddPartOutcome::HasIntersectingPart;
}
part_info_to_name.emplace(part_info, name);
return true;
return AddPartOutcome::Added;
}
bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, Strings * out_replaced_parts)
{
return add(part_info, part_info.getPartNameAndCheckFormat(format_version), out_replaced_parts);
String out_reason;
AddPartOutcome outcome = addImpl(part_info, part_info.getPartNameAndCheckFormat(format_version), out_replaced_parts, &out_reason);
if (outcome == AddPartOutcome::HasIntersectingPart)
{
chassert(!out_reason.empty());
throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason));
}
return outcome == AddPartOutcome::Added;
}

View File

@ -22,6 +22,13 @@ using Strings = std::vector<String>;
class ActiveDataPartSet
{
public:
enum class AddPartOutcome
{
Added,
HasCovering,
HasIntersectingPart,
};
explicit ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {}
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names);
@ -43,6 +50,8 @@ public:
bool add(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr);
bool add(const MergeTreePartInfo & part_info, Strings * out_replaced_parts = nullptr);
AddPartOutcome tryAddPart(const MergeTreePartInfo & part_info, String * out_reason = nullptr);
bool remove(const MergeTreePartInfo & part_info)
{
return part_info_to_name.erase(part_info) > 0;
@ -97,6 +106,8 @@ public:
MergeTreeDataFormatVersion getFormatVersion() const { return format_version; }
private:
AddPartOutcome addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr, String * out_reason = nullptr);
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, String> part_info_to_name;

View File

@ -20,6 +20,7 @@
#include <Common/ThreadFuzzer.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Config/ConfigHelper.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -76,6 +77,7 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
@ -96,6 +98,7 @@
#include <iomanip>
#include <limits>
#include <optional>
#include <ranges>
#include <set>
#include <thread>
#include <typeinfo>
@ -3915,25 +3918,17 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
return;
}
/// Let's restore some parts covered by unexpected to avoid partial data
if (restore_covered)
{
Strings restored;
bool error = false;
String error_parts;
Int64 pos = part->info.min_block;
Strings error_parts;
auto is_appropriate_state = [] (DataPartState state)
{
return state == DataPartState::Active || state == DataPartState::Outdated;
};
auto update_error = [&] (DataPartIteratorByInfo it)
{
error = true;
error_parts += (*it)->getNameWithState() + " ";
};
auto activate_part = [this, &restored_active_part](auto it)
{
/// It's not clear what to do if we try to activate part that was removed in transaction.
@ -3951,68 +3946,90 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
restored_active_part = true;
};
auto it_middle = data_parts_by_info.lower_bound(part->info);
/// ActiveDataPartSet allows to restore most top-level parts instead of unexpected.
/// It can be important in case of assigned merges. If unexpected part is result of some
/// finished, but not committed merge then we should restore (at least try to restore)
/// closest ancestors for the unexpected part to be able to execute it.
/// However it's not guaranteed because outdated parts can intersect
ActiveDataPartSet parts_for_replacement(format_version);
auto range = getDataPartsPartitionRange(part->info.partition_id);
DataPartsVector parts_candidates(range.begin(), range.end());
/// Restore the leftmost part covered by the part
if (it_middle != data_parts_by_info.begin())
/// In case of intersecting outdated parts we want to add bigger parts (with higher level) first
auto comparator = [] (const DataPartPtr left, const DataPartPtr right) -> bool
{
auto it = std::prev(it_middle);
if (part->contains(**it) && is_appropriate_state((*it)->getState()))
{
/// Maybe, we must consider part level somehow
if ((*it)->info.min_block != part->info.min_block)
update_error(it);
if ((*it)->getState() != DataPartState::Active)
activate_part(it);
pos = (*it)->info.max_block + 1;
restored.push_back((*it)->name);
}
else if ((*it)->info.partition_id == part->info.partition_id)
update_error(it);
if (left->info.level < right->info.level)
return true;
else if (left->info.level > right->info.level)
return false;
else
error = true;
return left->info.mutation < right->info.mutation;
};
std::sort(parts_candidates.begin(), parts_candidates.end(), comparator);
/// From larger to smaller parts
for (const auto & part_candidate_in_partition : parts_candidates | std::views::reverse)
{
if (part->info.contains(part_candidate_in_partition->info)
&& is_appropriate_state(part_candidate_in_partition->getState()))
{
String out_reason;
/// Outdated parts can itersect legally (because of DROP_PART) here it's okay, we
/// are trying to do out best to restore covered parts.
auto outcome = parts_for_replacement.tryAddPart(part_candidate_in_partition->info, &out_reason);
if (outcome == ActiveDataPartSet::AddPartOutcome::HasIntersectingPart)
{
error_parts.push_back(part->name);
LOG_ERROR(log, "Failed to restore part {}, because of intersection reason '{}'", part->name, out_reason);
}
}
}
if (parts_for_replacement.size() > 0)
{
std::vector<std::pair<uint64_t, uint64_t>> holes_list;
/// Most part of the code below is just to write pretty message
auto part_infos = parts_for_replacement.getPartInfos();
int64_t current_right_block = part_infos[0].min_block;
for (const auto & top_level_part_to_replace : part_infos)
{
auto data_part_it = data_parts_by_info.find(top_level_part_to_replace);
if (data_part_it == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find part {} in own set", top_level_part_to_replace.getPartNameForLogs());
activate_part(data_part_it);
restored.push_back((*data_part_it)->name);
if (top_level_part_to_replace.min_block - current_right_block > 1)
holes_list.emplace_back(current_right_block, top_level_part_to_replace.min_block);
current_right_block = top_level_part_to_replace.max_block;
}
if (part->info.max_block != current_right_block)
holes_list.emplace_back(current_right_block, part->info.max_block);
for (const String & name : restored)
LOG_INFO(log, "Activated part {} in place of unexpected {}", name, part->name);
if (!error_parts.empty() || !holes_list.empty())
{
std::string error_parts_message, holes_list_message;
if (!error_parts.empty())
error_parts_message = fmt::format(" Parts failed to restore because of intersection: [{}]", fmt::join(error_parts, ", "));
if (!holes_list.empty())
{
if (!error_parts.empty())
holes_list_message = ".";
Strings holes_list_pairs;
for (const auto & [left_side, right_side] : holes_list)
holes_list_pairs.push_back(fmt::format("({}, {})", left_side + 1, right_side - 1));
holes_list_message += fmt::format(" Block ranges failed to restore: [{}]", fmt::join(holes_list_pairs, ", "));
}
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}{}",
part->name, error_parts_message, holes_list_message);
}
}
else
error = true;
/// Restore "right" parts
for (auto it = it_middle; it != data_parts_by_info.end() && part->contains(**it); ++it)
{
if ((*it)->info.min_block < pos)
continue;
if (!is_appropriate_state((*it)->getState()))
{
update_error(it);
continue;
}
if ((*it)->info.min_block > pos)
update_error(it);
if ((*it)->getState() != DataPartState::Active)
activate_part(it);
pos = (*it)->info.max_block + 1;
restored.push_back((*it)->name);
}
if (pos != part->info.max_block + 1)
error = true;
for (const String & name : restored)
{
LOG_INFO(log, "Activated part {}", name);
}
if (error)
{
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}",
part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
LOG_INFO(log, "Don't find any parts for replacement instead of unexpected {}", part->name);
}
}

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_with_unsuccessful_commits"
# will be flaky in 2031
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_with_unsuccessful_commits (key UInt64, value String) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/unsuccessful', '1') ORDER BY tuple() SETTINGS cleanup_delay_period=1000, max_cleanup_delay_period=1000, old_parts_lifetime = 1949748529, remove_rolled_back_parts_immediately=0, replicated_max_ratio_of_wrong_parts=1, max_suspicious_broken_parts=1000000, max_suspicious_broken_parts_bytes=10000000000"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
for i in {0..10}; do
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL"
done
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2"
original_parts=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name")
$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits MODIFY SETTING fault_probability_before_part_commit=1"
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL SETTINGS alter_sync=0"
i=0 retries=300
while [[ $i -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.replication_queue WHERE table = 'table_with_unsuccessful_commits' and database=currentDatabase()")
if [[ $result ]]; then
break
fi
((++i))
done
parts_after_mutation=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name")
$CLICKHOUSE_CLIENT --query "DETACH TABLE table_with_unsuccessful_commits"
$CLICKHOUSE_CLIENT --query "ATTACH TABLE table_with_unsuccessful_commits"
parts_after_detach_attach=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name")
if [[ "$parts_after_detach_attach" == "$parts_after_mutation" && "$parts_after_mutation" == "$original_parts" ]]; then
echo "Ok"
else
echo "Original parts $original_parts"
echo "Parts after mutation $parts_after_mutation"
echo "Parts after detach attach $parts_after_detach_attach"
fi
$CLICKHOUSE_CLIENT --query "DROP TABLE table_with_unsuccessful_commits"