From f6fb20d6cfae39c0d2382375168b44f29d1451bc Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 19 Dec 2023 14:16:23 +0000 Subject: [PATCH] Backoff policy for failed mutation. --- src/Interpreters/Context.cpp | 6 + src/Interpreters/Context.h | 3 + src/Storages/MergeTree/MergeTreeData.cpp | 1 + src/Storages/MergeTree/MergeTreeData.h | 88 +++++++++++- .../ReplicatedMergeMutateTaskBase.cpp | 11 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 11 +- src/Storages/StorageMergeTree.cpp | 22 ++- src/Storages/StorageReplicatedMergeTree.cpp | 2 + .../config.d/backoff_mutation_policy.xml | 3 + .../integration/test_failed_mutations/test.py | 127 ++++++++++++++++++ 10 files changed, 267 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_failed_mutations/configs/config.d/backoff_mutation_policy.xml create mode 100644 tests/integration/test_failed_mutations/test.py diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a9d5c815d3b..5debbc32b32 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2902,6 +2902,12 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting return task_settings; } +size_t Context::getMaxPostponeTimeForFailedMutations() const +{ + const auto & config = getConfigRef(); + return config.getUInt("max_postpone_time_for_failed_mutations", 0ull); +} + BackgroundSchedulePool & Context::getSchedulePool() const { callOnce(shared->schedule_pool_initialized, [&] { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index b870fe5b250..b91b3d16f39 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1022,6 +1022,9 @@ public: BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; + // Setting for backoff policy for failed mutation tasks. + size_t getMaxPostponeTimeForFailedMutations() const; + BackgroundSchedulePool & getBufferFlushSchedulePool() const; BackgroundSchedulePool & getSchedulePool() const; BackgroundSchedulePool & getMessageBrokerSchedulePool() const; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e3de926570b..ee988f6b0ce 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -363,6 +363,7 @@ MergeTreeData::MergeTreeData( , parts_mover(this) , background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext()) , background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext()) + , mutation_backoff_policy(getContext()) { context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded(); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index f0dbaf0e307..c7e62da1b1a 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -35,7 +35,7 @@ #include #include #include - +#include #include #include @@ -1348,6 +1348,92 @@ protected: const MergeListEntry * merge_entry, std::shared_ptr profile_counters); + class PartMutationBackoffPolicy : public WithContext + { + struct PartMutationInfo + { + size_t retry_count = 0ul; + Poco::Timestamp latest_fail_time{}; + UInt64 mutation_failure_version = 0ul; + + Poco::Timestamp getNextMinExecutionTime() const + { + return latest_fail_time + (1 << retry_count) * 1000ul; + } + }; + + using DataPartsWithRetryInfo = std::unordered_map; + DataPartsWithRetryInfo failed_mutation_parts; + size_t max_pospone_power; + mutable std::mutex parts_info_lock; + + public: + explicit PartMutationBackoffPolicy(ContextPtr global_context_) + : WithContext(global_context_) + { + size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations(); + if (max_pospone_time_ms == 0) + max_pospone_power = 0; + else + max_pospone_power = static_cast(std::log2(max_pospone_time_ms)); + } + + void removeFromFailedByVersion(UInt64 mutation_version) + { + if (max_pospone_power == 0) + return; + std::unique_lock _lock(parts_info_lock); + + for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();) + { + if (failed_part_it->second.mutation_failure_version == mutation_version) + failed_part_it = failed_mutation_parts.erase(failed_part_it); + else + ++failed_part_it; + } + } + + void removePartFromFailed(const String& part_name) + { + if (max_pospone_power == 0) + return; + std::unique_lock _lock(parts_info_lock); + failed_mutation_parts.erase(part_name); + } + + void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version) + { + if (max_pospone_power == 0) + return; + std::unique_lock _lock(parts_info_lock); + auto part_info_it = failed_mutation_parts.find(part_name); + if (part_info_it == failed_mutation_parts.end()) + { + auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo()); + std::swap(it, part_info_it); + } + auto& part_info = part_info_it->second; + part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1); + part_info.latest_fail_time = Poco::Timestamp(); + part_info.mutation_failure_version = _mutation_failure_version; + } + + bool partCanBeMutated(const String& part_name) + { + if (max_pospone_power == 0) + return true; + std::unique_lock _lock(parts_info_lock); + auto iter = failed_mutation_parts.find(part_name); + if (iter == failed_mutation_parts.end()) + return true; + + auto current_time = Poco::Timestamp(); + return current_time >= iter->second.getNextMinExecutionTime(); + } + }; + /// Contolls postponing logic for failed mutations. + PartMutationBackoffPolicy mutation_backoff_policy; + /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different /// mechanisms for parts locking diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp index 94c069d789b..32f98cdfb6f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -110,11 +111,13 @@ bool ReplicatedMergeMutateTaskBase::executeStep() auto mutations_end_it = in_partition->second.upper_bound(result_data_version); for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { + auto & src_part = log_entry->source_parts.at(0); ReplicatedMergeTreeQueue::MutationStatus & status = *it->second; - status.latest_failed_part = log_entry->source_parts.at(0); + status.latest_failed_part = src_part; status.latest_failed_part_info = source_part_info; status.latest_fail_time = time(nullptr); status.latest_fail_reason = getExceptionMessage(saved_exception, false); + storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1); } } } @@ -142,6 +145,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl() { storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry); state = State::SUCCESS; + + auto & log_entry = selected_entry->log_entry; + if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART) + { + storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0)); + } } catch (...) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a3afa8cd88a..b072795cbf3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -13,6 +13,7 @@ #include #include +#include namespace DB { @@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( sum_parts_size_in_bytes += part_in_memory->block.bytes(); else sum_parts_size_in_bytes += part->getBytesOnDisk(); + + if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name)) + { + constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} " + "because recently it has failed. According to exponential backoff policy, put aside this log entry."; + + LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name); + return false; + } } } - if (merger_mutator.merges_blocker.isCancelled()) { constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now."; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4761ccd8b58..e99e73f38d8 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -4,7 +4,7 @@ #include #include - +#include #include #include #include @@ -534,6 +534,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { MergeTreeMutationEntry & entry = it->second; + auto failed_part = result_part->parts.at(0); if (is_successful) { if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info)) @@ -542,14 +543,16 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re entry.latest_failed_part_info = MergeTreePartInfo(); entry.latest_fail_time = 0; entry.latest_fail_reason.clear(); + mutation_backoff_policy.removePartFromFailed(failed_part->name); } } else { - entry.latest_failed_part = result_part->parts.at(0)->name; - entry.latest_failed_part_info = result_part->parts.at(0)->info; + entry.latest_failed_part = failed_part->name; + entry.latest_failed_part_info = failed_part->info; entry.latest_fail_time = time(nullptr); entry.latest_fail_reason = exception_message; + mutation_backoff_policy.addPartMutationFailure(failed_part->name, sources_data_version + 1); } } } @@ -816,6 +819,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) } } + mutation_backoff_policy.removeFromFailedByVersion(mutation_version); + if (!to_kill) return CancellationCode::NotFound; @@ -1176,6 +1181,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( CurrentlyMergingPartsTaggerPtr tagger; + bool exist_posponed_failed_part = false; auto mutations_end_it = current_mutations_by_version.end(); for (const auto & part : getDataPartsVectorForInternalUsage()) { @@ -1200,6 +1206,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( TransactionID first_mutation_tid = mutations_begin_it->second.tid; MergeTreeTransactionPtr txn; + if (!mutation_backoff_policy.partCanBeMutated(part->name)) + { + exist_posponed_failed_part = true; + LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name); + continue; + } + if (!first_mutation_tid.isPrehistoric()) { @@ -1306,7 +1319,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( return std::make_shared(future_part, std::move(tagger), commands, txn); } } - + if (exist_posponed_failed_part) + mutation_wait_event.notify_all(); return {}; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 17dd995321d..ed65fbf932a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7438,6 +7438,8 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio if (!mutation_entry) return CancellationCode::NotFound; + mutation_backoff_policy.removeFromFailedByVersion(static_cast(mutation_entry->alter_version)); + /// After this point no new part mutations will start and part mutations that still exist /// in the queue will be skipped. diff --git a/tests/integration/test_failed_mutations/configs/config.d/backoff_mutation_policy.xml b/tests/integration/test_failed_mutations/configs/config.d/backoff_mutation_policy.xml new file mode 100644 index 00000000000..78d14ce327d --- /dev/null +++ b/tests/integration/test_failed_mutations/configs/config.d/backoff_mutation_policy.xml @@ -0,0 +1,3 @@ + + 60000 + diff --git a/tests/integration/test_failed_mutations/test.py b/tests/integration/test_failed_mutations/test.py new file mode 100644 index 00000000000..70f408db5f1 --- /dev/null +++ b/tests/integration/test_failed_mutations/test.py @@ -0,0 +1,127 @@ +import logging +import random +import threading +import time +from collections import Counter + +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node_with_backoff = cluster.add_instance( + "node_with_backoff", + macros={"cluster": "test_cluster"}, + main_configs=["configs/config.d/backoff_mutation_policy.xml"], + with_zookeeper=True, +) + +node_no_backoff = cluster.add_instance( + "node_no_backoff", + macros={"cluster": "test_cluster"}, + with_zookeeper=True, +) + +REPLICATED_POSPONE_MUTATION_LOG = ( + "According to exponential backoff policy, put aside this log entry" +) +POSPONE_MUTATION_LOG = ( + "According to exponential backoff policy, do not perform mutations for the part" +) + +all_nodes = [node_with_backoff, node_no_backoff] + + +def prepare_cluster(use_replicated_table): + for node in all_nodes: + node.query("DROP TABLE IF EXISTS test_mutations SYNC") + + engine = ( + "ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')" + if use_replicated_table + else "MergeTree()" + ) + + for node in all_nodes: + node.query(f"CREATE TABLE test_mutations(x UInt32) ENGINE {engine} ORDER BY x") + node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10") + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +@pytest.mark.parametrize( + ("node, found_in_log"), + [ + ( + node_with_backoff, + True, + ), + ( + node_no_backoff, + False, + ), + ], +) +def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log): + prepare_cluster(False) + + # Executing incorrect mutation. + node.query( + "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1" + ) + + assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log + node.rotate_logs() + + time.sleep(5) + node.query("KILL MUTATION WHERE table='test_mutations'") + # Check that after kill new parts mutations are postponing. + node.query( + "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1" + ) + + assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log + + +def test_exponential_backoff_with_replicated_tree(started_cluster): + + prepare_cluster(True) + + node_no_backoff.query( + "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1" + ) + + time.sleep(5) + assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False + assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == True + + +@pytest.mark.parametrize( + ("node"), + [ + (node_with_backoff), + ], +) +def test_exponential_backoff_create_dependent_table(started_cluster, node): + + prepare_cluster(False) + + node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10") + # Executing incorrect mutation. + node.query( + "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1" + ) + time.sleep(5) + # Creating dependent table for mutation. + node.query("CREATE TABLE dep_table(x UInt32) ENGINE MergeTree() ORDER BY x") + + time.sleep(5) + assert node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n"