Backoff policy for failed mutation.

This commit is contained in:
MikhailBurdukov 2023-12-19 14:16:23 +00:00
parent 47c6f17aef
commit f6fb20d6cf
10 changed files with 267 additions and 7 deletions

View File

@ -2902,6 +2902,12 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
return task_settings;
}
size_t Context::getMaxPostponeTimeForFailedMutations() const
{
const auto & config = getConfigRef();
return config.getUInt("max_postpone_time_for_failed_mutations", 0ull);
}
BackgroundSchedulePool & Context::getSchedulePool() const
{
callOnce(shared->schedule_pool_initialized, [&] {

View File

@ -1022,6 +1022,9 @@ public:
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
// Setting for backoff policy for failed mutation tasks.
size_t getMaxPostponeTimeForFailedMutations() const;
BackgroundSchedulePool & getBufferFlushSchedulePool() const;
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;

View File

@ -363,6 +363,7 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
, mutation_backoff_policy(getContext())
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();

View File

@ -35,7 +35,7 @@
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Poco/Timestamp.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -1348,6 +1348,92 @@ protected:
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);
class PartMutationBackoffPolicy : public WithContext
{
struct PartMutationInfo
{
size_t retry_count = 0ul;
Poco::Timestamp latest_fail_time{};
UInt64 mutation_failure_version = 0ul;
Poco::Timestamp getNextMinExecutionTime() const
{
return latest_fail_time + (1 << retry_count) * 1000ul;
}
};
using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
size_t max_pospone_power;
mutable std::mutex parts_info_lock;
public:
explicit PartMutationBackoffPolicy(ContextPtr global_context_)
: WithContext(global_context_)
{
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations();
if (max_pospone_time_ms == 0)
max_pospone_power = 0;
else
max_pospone_power = static_cast<size_t>(std::log2(max_pospone_time_ms));
}
void removeFromFailedByVersion(UInt64 mutation_version)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{
if (failed_part_it->second.mutation_failure_version == mutation_version)
failed_part_it = failed_mutation_parts.erase(failed_part_it);
else
++failed_part_it;
}
}
void removePartFromFailed(const String& part_name)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}
void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo());
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1);
part_info.latest_fail_time = Poco::Timestamp();
part_info.mutation_failure_version = _mutation_failure_version;
}
bool partCanBeMutated(const String& part_name)
{
if (max_pospone_power == 0)
return true;
std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;
auto current_time = Poco::Timestamp();
return current_time >= iter->second.getNextMinExecutionTime();
}
};
/// Contolls postponing logic for failed mutations.
PartMutationBackoffPolicy mutation_backoff_policy;
/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different
/// mechanisms for parts locking

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>
@ -110,11 +111,13 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
auto & src_part = log_entry->source_parts.at(0);
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
status.latest_failed_part = log_entry->source_parts.at(0);
status.latest_failed_part = src_part;
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1);
}
}
}
@ -142,6 +145,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
state = State::SUCCESS;
auto & log_entry = selected_entry->log_entry;
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
}
}
catch (...)
{

View File

@ -13,6 +13,7 @@
#include <base/sort.h>
#include <ranges>
#include <Poco/Timestamp.h>
namespace DB
{
@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
}
}
if (merger_mutator.merges_blocker.isCancelled())
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";

View File

@ -4,7 +4,7 @@
#include <optional>
#include <ranges>
#include <Poco/Timestamp.h>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
@ -534,6 +534,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
auto failed_part = result_part->parts.at(0);
if (is_successful)
{
if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info))
@ -542,14 +543,16 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
mutation_backoff_policy.removePartFromFailed(failed_part->name);
}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_failed_part = failed_part->name;
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
mutation_backoff_policy.addPartMutationFailure(failed_part->name, sources_data_version + 1);
}
}
}
@ -816,6 +819,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}
mutation_backoff_policy.removeFromFailedByVersion(mutation_version);
if (!to_kill)
return CancellationCode::NotFound;
@ -1176,6 +1181,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
CurrentlyMergingPartsTaggerPtr tagger;
bool exist_posponed_failed_part = false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage())
{
@ -1200,6 +1206,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;
if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
exist_posponed_failed_part = true;
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}
if (!first_mutation_tid.isPrehistoric())
{
@ -1306,7 +1319,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}
if (exist_posponed_failed_part)
mutation_wait_event.notify_all();
return {};
}

View File

@ -7438,6 +7438,8 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
if (!mutation_entry)
return CancellationCode::NotFound;
mutation_backoff_policy.removeFromFailedByVersion(static_cast<UInt64>(mutation_entry->alter_version));
/// After this point no new part mutations will start and part mutations that still exist
/// in the queue will be skipped.

View File

@ -0,0 +1,3 @@
<clickhouse>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</clickhouse>

View File

@ -0,0 +1,127 @@
import logging
import random
import threading
import time
from collections import Counter
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_with_backoff = cluster.add_instance(
"node_with_backoff",
macros={"cluster": "test_cluster"},
main_configs=["configs/config.d/backoff_mutation_policy.xml"],
with_zookeeper=True,
)
node_no_backoff = cluster.add_instance(
"node_no_backoff",
macros={"cluster": "test_cluster"},
with_zookeeper=True,
)
REPLICATED_POSPONE_MUTATION_LOG = (
"According to exponential backoff policy, put aside this log entry"
)
POSPONE_MUTATION_LOG = (
"According to exponential backoff policy, do not perform mutations for the part"
)
all_nodes = [node_with_backoff, node_no_backoff]
def prepare_cluster(use_replicated_table):
for node in all_nodes:
node.query("DROP TABLE IF EXISTS test_mutations SYNC")
engine = (
"ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')"
if use_replicated_table
else "MergeTree()"
)
for node in all_nodes:
node.query(f"CREATE TABLE test_mutations(x UInt32) ENGINE {engine} ORDER BY x")
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize(
("node, found_in_log"),
[
(
node_with_backoff,
True,
),
(
node_no_backoff,
False,
),
],
)
def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log):
prepare_cluster(False)
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log
node.rotate_logs()
time.sleep(5)
node.query("KILL MUTATION WHERE table='test_mutations'")
# Check that after kill new parts mutations are postponing.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log
def test_exponential_backoff_with_replicated_tree(started_cluster):
prepare_cluster(True)
node_no_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
time.sleep(5)
assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False
assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == True
@pytest.mark.parametrize(
("node"),
[
(node_with_backoff),
],
)
def test_exponential_backoff_create_dependent_table(started_cluster, node):
prepare_cluster(False)
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
time.sleep(5)
# Creating dependent table for mutation.
node.query("CREATE TABLE dep_table(x UInt32) ENGINE MergeTree() ORDER BY x")
time.sleep(5)
assert node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n"