diff --git a/src/Storages/MergeTree/ActiveDataPartSet.cpp b/src/Storages/MergeTree/ActiveDataPartSet.cpp index 5b7965bc3a0..a1746cc1746 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -21,18 +21,46 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, add(name); } -bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts) +ActiveDataPartSet::AddPartOutcome ActiveDataPartSet::tryAddPart(const MergeTreePartInfo & part_info, String * out_reason) { - auto part_info = MergeTreePartInfo::fromPartName(name, format_version); - return add(part_info, name, out_replaced_parts); + return addImpl(part_info, part_info.getPartNameAndCheckFormat(format_version), nullptr, out_reason); } bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts) +{ + String out_reason; + AddPartOutcome outcome = addImpl(part_info, name, out_replaced_parts, &out_reason); + if (outcome == AddPartOutcome::HasIntersectingPart) + { + chassert(!out_reason.empty()); + throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason)); + } + + return outcome == AddPartOutcome::Added; +} + + +bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts) +{ + auto part_info = MergeTreePartInfo::fromPartName(name, format_version); + String out_reason; + AddPartOutcome outcome = addImpl(part_info, name, out_replaced_parts, &out_reason); + if (outcome == AddPartOutcome::HasIntersectingPart) + { + chassert(!out_reason.empty()); + throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason)); + } + + return outcome == AddPartOutcome::Added; +} + + +ActiveDataPartSet::AddPartOutcome ActiveDataPartSet::addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts, String * out_reason) { /// TODO make it exception safe (out_replaced_parts->push_back(...) may throw) if (getContainingPartImpl(part_info) != part_info_to_name.end()) - return false; + return AddPartOutcome::HasCovering; /// Parts contained in `part` are located contiguously in `part_info_to_name`, overlapping with the place where the part itself would be inserted. auto it = part_info_to_name.lower_bound(part_info); @@ -47,10 +75,15 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String & if (!part_info.contains(it->first)) { if (!part_info.isDisjoint(it->first)) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Part {} intersects previous part {}. " - "It is a bug or a result of manual intervention in the ZooKeeper data.", - part_info.getPartNameForLogs(), it->first.getPartNameForLogs()); + { + if (out_reason != nullptr) + *out_reason = fmt::format( + "Part {} intersects previous part {}. " + "It is a bug or a result of manual intervention in the ZooKeeper data.", + part_info.getPartNameForLogs(), + it->first.getPartNameForLogs()); + return AddPartOutcome::HasIntersectingPart; + } ++it; break; } @@ -73,18 +106,33 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String & } if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first)) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Part {} intersects part {}. It is a bug or a result of manual intervention " - "in the ZooKeeper data.", name, it->first.getPartNameForLogs()); + { + if (out_reason != nullptr) + *out_reason = fmt::format( + "Part {} intersects part {}. It is a bug or a result of manual intervention " + "in the ZooKeeper data.", + name, + it->first.getPartNameForLogs()); + + return AddPartOutcome::HasIntersectingPart; + } part_info_to_name.emplace(part_info, name); - return true; + return AddPartOutcome::Added; } bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, Strings * out_replaced_parts) { - return add(part_info, part_info.getPartNameAndCheckFormat(format_version), out_replaced_parts); + String out_reason; + AddPartOutcome outcome = addImpl(part_info, part_info.getPartNameAndCheckFormat(format_version), out_replaced_parts, &out_reason); + if (outcome == AddPartOutcome::HasIntersectingPart) + { + chassert(!out_reason.empty()); + throw Exception(ErrorCodes::LOGICAL_ERROR, fmt::runtime(out_reason)); + } + + return outcome == AddPartOutcome::Added; } diff --git a/src/Storages/MergeTree/ActiveDataPartSet.h b/src/Storages/MergeTree/ActiveDataPartSet.h index f3cd6b0019d..662309f64cf 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/src/Storages/MergeTree/ActiveDataPartSet.h @@ -22,6 +22,13 @@ using Strings = std::vector; class ActiveDataPartSet { public: + enum class AddPartOutcome + { + Added, + HasCovering, + HasIntersectingPart, + }; + explicit ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {} ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names); @@ -43,6 +50,8 @@ public: bool add(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr); bool add(const MergeTreePartInfo & part_info, Strings * out_replaced_parts = nullptr); + AddPartOutcome tryAddPart(const MergeTreePartInfo & part_info, String * out_reason = nullptr); + bool remove(const MergeTreePartInfo & part_info) { return part_info_to_name.erase(part_info) > 0; @@ -97,6 +106,8 @@ public: MergeTreeDataFormatVersion getFormatVersion() const { return format_version; } private: + + AddPartOutcome addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr, String * out_reason = nullptr); MergeTreeDataFormatVersion format_version; std::map part_info_to_name; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 62c30d184f4..d6c39da48e2 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -76,6 +77,7 @@ #include #include #include +#include #include #include #include @@ -96,6 +98,7 @@ #include #include #include +#include #include #include #include @@ -3915,25 +3918,17 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT return; } + /// Let's restore some parts covered by unexpected to avoid partial data if (restore_covered) { Strings restored; - bool error = false; - String error_parts; - - Int64 pos = part->info.min_block; + Strings error_parts; auto is_appropriate_state = [] (DataPartState state) { return state == DataPartState::Active || state == DataPartState::Outdated; }; - auto update_error = [&] (DataPartIteratorByInfo it) - { - error = true; - error_parts += (*it)->getNameWithState() + " "; - }; - auto activate_part = [this, &restored_active_part](auto it) { /// It's not clear what to do if we try to activate part that was removed in transaction. @@ -3951,68 +3946,90 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT restored_active_part = true; }; - auto it_middle = data_parts_by_info.lower_bound(part->info); + /// ActiveDataPartSet allows to restore most top-level parts instead of unexpected. + /// It can be important in case of assigned merges. If unexpected part is result of some + /// finished, but not committed merge then we should restore (at least try to restore) + /// closest ancestors for the unexpected part to be able to execute it. + /// However it's not guaranteed because outdated parts can intersect + ActiveDataPartSet parts_for_replacement(format_version); + auto range = getDataPartsPartitionRange(part->info.partition_id); + DataPartsVector parts_candidates(range.begin(), range.end()); - /// Restore the leftmost part covered by the part - if (it_middle != data_parts_by_info.begin()) + /// In case of intersecting outdated parts we want to add bigger parts (with higher level) first + auto comparator = [] (const DataPartPtr left, const DataPartPtr right) -> bool { - auto it = std::prev(it_middle); - - if (part->contains(**it) && is_appropriate_state((*it)->getState())) - { - /// Maybe, we must consider part level somehow - if ((*it)->info.min_block != part->info.min_block) - update_error(it); - - if ((*it)->getState() != DataPartState::Active) - activate_part(it); - - pos = (*it)->info.max_block + 1; - restored.push_back((*it)->name); - } - else if ((*it)->info.partition_id == part->info.partition_id) - update_error(it); + if (left->info.level < right->info.level) + return true; + else if (left->info.level > right->info.level) + return false; else - error = true; + return left->info.mutation < right->info.mutation; + }; + std::sort(parts_candidates.begin(), parts_candidates.end(), comparator); + /// From larger to smaller parts + for (const auto & part_candidate_in_partition : parts_candidates | std::views::reverse) + { + if (part->info.contains(part_candidate_in_partition->info) + && is_appropriate_state(part_candidate_in_partition->getState())) + { + String out_reason; + /// Outdated parts can itersect legally (because of DROP_PART) here it's okay, we + /// are trying to do out best to restore covered parts. + auto outcome = parts_for_replacement.tryAddPart(part_candidate_in_partition->info, &out_reason); + if (outcome == ActiveDataPartSet::AddPartOutcome::HasIntersectingPart) + { + error_parts.push_back(part->name); + LOG_ERROR(log, "Failed to restore part {}, because of intersection reason '{}'", part->name, out_reason); + } + } + } + + if (parts_for_replacement.size() > 0) + { + std::vector> holes_list; + /// Most part of the code below is just to write pretty message + auto part_infos = parts_for_replacement.getPartInfos(); + int64_t current_right_block = part_infos[0].min_block; + for (const auto & top_level_part_to_replace : part_infos) + { + auto data_part_it = data_parts_by_info.find(top_level_part_to_replace); + if (data_part_it == data_parts_by_info.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find part {} in own set", top_level_part_to_replace.getPartNameForLogs()); + activate_part(data_part_it); + restored.push_back((*data_part_it)->name); + if (top_level_part_to_replace.min_block - current_right_block > 1) + holes_list.emplace_back(current_right_block, top_level_part_to_replace.min_block); + current_right_block = top_level_part_to_replace.max_block; + } + if (part->info.max_block != current_right_block) + holes_list.emplace_back(current_right_block, part->info.max_block); + + for (const String & name : restored) + LOG_INFO(log, "Activated part {} in place of unexpected {}", name, part->name); + + if (!error_parts.empty() || !holes_list.empty()) + { + std::string error_parts_message, holes_list_message; + if (!error_parts.empty()) + error_parts_message = fmt::format(" Parts failed to restore because of intersection: [{}]", fmt::join(error_parts, ", ")); + if (!holes_list.empty()) + { + if (!error_parts.empty()) + holes_list_message = "."; + + Strings holes_list_pairs; + for (const auto & [left_side, right_side] : holes_list) + holes_list_pairs.push_back(fmt::format("({}, {})", left_side + 1, right_side - 1)); + holes_list_message += fmt::format(" Block ranges failed to restore: [{}]", fmt::join(holes_list_pairs, ", ")); + } + LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. " + "SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}{}", + part->name, error_parts_message, holes_list_message); + } } else - error = true; - - /// Restore "right" parts - for (auto it = it_middle; it != data_parts_by_info.end() && part->contains(**it); ++it) { - if ((*it)->info.min_block < pos) - continue; - - if (!is_appropriate_state((*it)->getState())) - { - update_error(it); - continue; - } - - if ((*it)->info.min_block > pos) - update_error(it); - - if ((*it)->getState() != DataPartState::Active) - activate_part(it); - - pos = (*it)->info.max_block + 1; - restored.push_back((*it)->name); - } - - if (pos != part->info.max_block + 1) - error = true; - - for (const String & name : restored) - { - LOG_INFO(log, "Activated part {}", name); - } - - if (error) - { - LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. " - "SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}", - part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts)); + LOG_INFO(log, "Don't find any parts for replacement instead of unexpected {}", part->name); } } diff --git a/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.reference b/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.reference new file mode 100644 index 00000000000..7326d960397 --- /dev/null +++ b/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.reference @@ -0,0 +1 @@ +Ok diff --git a/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.sh b/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.sh new file mode 100755 index 00000000000..c6165c1e983 --- /dev/null +++ b/tests/queries/0_stateless/02899_restore_parts_replicated_merge_tree.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_with_unsuccessful_commits" + +# will be flaky in 2031 +$CLICKHOUSE_CLIENT --query "CREATE TABLE table_with_unsuccessful_commits (key UInt64, value String) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/unsuccessful', '1') ORDER BY tuple() SETTINGS cleanup_delay_period=1000, max_cleanup_delay_period=1000, old_parts_lifetime = 1949748529, remove_rolled_back_parts_immediately=0, replicated_max_ratio_of_wrong_parts=1, max_suspicious_broken_parts=1000000, max_suspicious_broken_parts_bytes=10000000000" + +$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)" + +$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)" + + +for i in {0..10}; do + $CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL" +done + +$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2" +$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2" + +$CLICKHOUSE_CLIENT --query "INSERT INTO table_with_unsuccessful_commits SELECT rand(), toString(rand()) FROM numbers(10)" + +$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits DELETE WHERE value = 'hello' SETTINGS mutations_sync=2" + +original_parts=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name") + +$CLICKHOUSE_CLIENT --query "ALTER TABLE table_with_unsuccessful_commits MODIFY SETTING fault_probability_before_part_commit=1" + +$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE table_with_unsuccessful_commits FINAL SETTINGS alter_sync=0" + +i=0 retries=300 + +while [[ $i -lt $retries ]]; do + result=$($CLICKHOUSE_CLIENT --query "SELECT last_exception FROM system.replication_queue WHERE table = 'table_with_unsuccessful_commits' and database=currentDatabase()") + + if [[ $result ]]; then + break + fi + + ((++i)) +done + +parts_after_mutation=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name") + +$CLICKHOUSE_CLIENT --query "DETACH TABLE table_with_unsuccessful_commits" + +$CLICKHOUSE_CLIENT --query "ATTACH TABLE table_with_unsuccessful_commits" + +parts_after_detach_attach=$($CLICKHOUSE_CLIENT --query "SELECT name FROM system.parts where table = 'table_with_unsuccessful_commits' and database = currentDatabase() and active order by name") + +if [[ "$parts_after_detach_attach" == "$parts_after_mutation" && "$parts_after_mutation" == "$original_parts" ]]; then + echo "Ok" +else + echo "Original parts $original_parts" + echo "Parts after mutation $parts_after_mutation" + echo "Parts after detach attach $parts_after_detach_attach" +fi + +$CLICKHOUSE_CLIENT --query "DROP TABLE table_with_unsuccessful_commits"