Review changes

This commit is contained in:
MikhailBurdukov 2024-02-18 13:37:12 +00:00
parent 97ee8c0901
commit b227e5dd3f
9 changed files with 70 additions and 45 deletions

View File

@ -1364,30 +1364,29 @@ protected:
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);
class PartMutationBackoffPolicy : public WithContext
class PartMutationBackoffPolicy
{
struct PartMutationInfo
{
size_t retry_count;
Poco::Timestamp latest_fail_time;
UInt64 mutation_failure_version;
size_t latest_fail_time_us;
size_t max_postpone_time_ms;
size_t max_postpone_power;
PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
PartMutationInfo(size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time(std::move(Poco::Timestamp()))
, mutation_failure_version(mutation_failure_version_)
, latest_fail_time_us(static_cast<size_t>(Poco::Timestamp().epochMicroseconds()))
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}
Poco::Timestamp getNextMinExecutionTime() const
size_t getNextMinExecutionTimeUsResolution() const
{
if (max_postpone_time_ms == 0)
return Poco::Timestamp();
return latest_fail_time + (1 << retry_count) * 1000ul;
return static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
size_t current_backoff_interval_us = (1 << retry_count) * 1000ul;
return latest_fail_time_us + current_backoff_interval_us;
}
void addPartFailure()
@ -1395,7 +1394,7 @@ protected:
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time = Poco::Timestamp();
latest_fail_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
}
bool partCanBeMutated()
@ -1403,8 +1402,8 @@ protected:
if (max_postpone_time_ms == 0)
return true;
auto current_time = Poco::Timestamp();
return current_time >= getNextMinExecutionTime();
auto current_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
return current_time_us >= getNextMinExecutionTimeUsResolution();
}
};
@ -1414,31 +1413,25 @@ protected:
public:
void removeFromFailedByVersion(UInt64 mutation_version)
void resetMutationFailures()
{
std::unique_lock _lock(parts_info_lock);
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{
if (failed_part_it->second.mutation_failure_version == mutation_version)
failed_part_it = failed_mutation_parts.erase(failed_part_it);
else
++failed_part_it;
}
failed_mutation_parts.clear();
}
void removePartFromFailed(const String& part_name)
void removePartFromFailed(const String & part_name)
{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}
void addPartMutationFailure (const String& part_name, UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_)
{
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(mutation_failure_version_, max_postpone_time_ms_));
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(max_postpone_time_ms_));
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;

View File

@ -146,7 +146,7 @@ struct Settings;
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations_ms, 0ul, "The maximum postpone time for failed mutations.", 0) \
\
/** Compatibility settings */ \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \

View File

@ -118,7 +118,7 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, result_data_version, storage.getSettings()->max_postpone_time_for_failed_mutations);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, storage.getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}

View File

@ -573,7 +573,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, getSettings()->max_postpone_time_for_failed_mutations);
mutation_backoff_policy.addPartMutationFailure(failed_part->name, getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}
@ -845,7 +845,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}
mutation_backoff_policy.removeFromFailedByVersion(mutation_version);
mutation_backoff_policy.resetMutationFailures();
if (!to_kill)
return CancellationCode::NotFound;
@ -1207,7 +1207,6 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
CurrentlyMergingPartsTaggerPtr tagger;
bool exist_postponed_failed_part = false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage())
{
@ -1234,7 +1233,6 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
exist_postponed_failed_part = true;
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}
@ -1345,11 +1343,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}
if (exist_postponed_failed_part)
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
return {};
}

View File

@ -7491,6 +7491,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
mutation_backoff_policy.resetMutationFailures();
return CancellationCode::CancelSent;
}

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>200</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>

View File

@ -30,6 +30,7 @@ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/backoff_failed_mutation.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_old_dirs_cleanup.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/

View File

@ -1,5 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
<max_postpone_time_for_failed_mutations_ms>60000</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>

View File

@ -1,9 +1,4 @@
import logging
import random
import threading
import time
from collections import Counter
import pytest
from helpers.cluster import ClickHouseCluster
@ -83,7 +78,6 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log
node.rotate_logs()
time.sleep(5)
node.query("KILL MUTATION WHERE table='test_mutations'")
# Check that after kill new parts mutations are postponing.
node.query(
@ -117,12 +111,18 @@ def test_exponential_backoff_create_dependent_table(started_cluster, node):
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
time.sleep(5)
# Creating dependent table for mutation.
node.query("CREATE TABLE dep_table(x UInt32) ENGINE MergeTree() ORDER BY x")
time.sleep(5)
assert node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n"
retry_count = 100
no_unfinished_mutation = False
for _ in range(0,retry_count):
if node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n":
no_unfinished_mutation = True
break
assert no_unfinished_mutation
node.query("DROP TABLE IF EXISTS dep_table SYNC")
@ -131,7 +131,7 @@ def test_exponential_backoff_setting_override(started_cluster):
node.rotate_logs()
node.query("DROP TABLE IF EXISTS test_mutations SYNC")
node.query(
"CREATE TABLE test_mutations(x UInt32) ENGINE=MergeTree() ORDER BY x SETTINGS max_postpone_time_for_failed_mutations=0"
"CREATE TABLE test_mutations(x UInt32) ENGINE=MergeTree() ORDER BY x SETTINGS max_postpone_time_for_failed_mutations_ms=0"
)
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
@ -167,3 +167,34 @@ def test_backoff_clickhouse_restart(started_cluster, replicated_table):
assert node.wait_for_log_line(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
)
@pytest.mark.parametrize(
("replicated_table"),
[
(False),
(True),
],
)
def test_no_backoff_after_killing_mutation(started_cluster, replicated_table):
prepare_cluster(replicated_table)
node = node_with_backoff
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
# Executing correct mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x=1"
)
assert node.wait_for_log_line(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
)
mutation_ids = node.query('select mutation_id from system.mutations').split()
node.query(
f"KILL MUTATION WHERE table = 'test_mutations' AND mutation_id = '{mutation_ids[0]}'"
)
node.rotate_logs()
assert not node.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG)