add settings to delay or throw in case of too many mutations

This commit is contained in:
Anton Popov 2023-04-24 18:21:49 +00:00
parent e2e62b32e5
commit 38622d0770
16 changed files with 269 additions and 9 deletions

View File

@ -650,6 +650,7 @@
M(679, IO_URING_SUBMIT_ERROR) \
M(690, MIXED_ACCESS_PARAMETER_TYPES) \
M(691, UNKNOWN_ELEMENT_OF_ENUM) \
M(692, TOO_MANY_MUTATIONS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -102,6 +102,9 @@
M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \
M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(DelayedMutations, "Number of times the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \
M(RejectedMutations, "Number of times the mutation of a MergeTree table was rejected with 'Too many mutations' exception due to high number of unfinished mutations for table.") \
M(DelayedMutationsMilliseconds, "Total number of milliseconds spent while the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \
M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \
M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \

View File

@ -275,6 +275,8 @@ class IColumn;
\
M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
M(UInt64, number_of_mutations_to_delay, 0, "If the mutated table contains at least that many unfinished mutations, artificially slow down mutations of table. 0 - disabled", 0) \
M(UInt64, number_of_mutations_to_throw, 0, "If the mutated table contains at least that many unfinished mutations, throw 'Too many mutations ...' exception. 0 - disabled", 0) \
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \

View File

@ -114,6 +114,9 @@ namespace ProfileEvents
extern const Event MergedIntoWideParts;
extern const Event MergedIntoCompactParts;
extern const Event MergedIntoInMemoryParts;
extern const Event RejectedMutations;
extern const Event DelayedMutations;
extern const Event DelayedMutationsMilliseconds;
}
namespace CurrentMetrics
@ -171,6 +174,7 @@ namespace ErrorCodes
extern const int SERIALIZATION_ERROR;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int TOO_MANY_MUTATIONS;
}
@ -4296,6 +4300,51 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
}
void MergeTreeData::delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
{
const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef();
size_t num_mutations_to_delay = query_settings.number_of_mutations_to_delay
? query_settings.number_of_mutations_to_delay
: settings->number_of_mutations_to_delay;
size_t num_mutations_to_throw = query_settings.number_of_mutations_to_throw
? query_settings.number_of_mutations_to_throw
: settings->number_of_mutations_to_throw;
if (!num_mutations_to_delay && !num_mutations_to_throw)
return;
size_t num_unfinished_mutations = getNumberOfUnfinishedMutations();
if (num_mutations_to_throw && num_unfinished_mutations >= num_mutations_to_throw)
{
ProfileEvents::increment(ProfileEvents::RejectedMutations);
throw Exception(ErrorCodes::TOO_MANY_MUTATIONS,
"Too many unfinished mutations ({}) in table {}",
num_unfinished_mutations, getLogName());
}
if (num_mutations_to_delay && num_unfinished_mutations >= num_mutations_to_delay)
{
if (!num_mutations_to_throw)
num_mutations_to_throw = num_mutations_to_delay * 2;
size_t mutations_over_threshold = num_unfinished_mutations - num_mutations_to_delay;
size_t allowed_mutations_over_threshold = num_mutations_to_throw - num_mutations_to_delay;
double delay_factor = std::min(static_cast<double>(mutations_over_threshold) / allowed_mutations_over_threshold, 1.0);
size_t delay_milliseconds = static_cast<size_t>(std::lerp(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
ProfileEvents::increment(ProfileEvents::DelayedMutations);
ProfileEvents::increment(ProfileEvents::DelayedMutationsMilliseconds, delay_milliseconds);
if (until)
until->tryWait(delay_milliseconds);
else
std::this_thread::sleep_for(std::chrono::milliseconds(delay_milliseconds));
}
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const

View File

@ -540,7 +540,6 @@ public:
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
std::optional<Int64> getMinPartDataVersion() const;
/// Returns all detached parts
DetachedPartsInfo getDetachedParts() const;
@ -551,11 +550,17 @@ public:
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
ContextPtr context, PartsTemporaryRename & renamed_parts);
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// If the table contains too many unfinished mutations, sleep for a while to give them time to execute.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// Returns number of unfinished mutations (is_done = 0).
virtual size_t getNumberOfUnfinishedMutations() const = 0;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.
/// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()).

View File

@ -83,6 +83,10 @@ struct Settings;
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(UInt64, number_of_mutations_to_delay, 0, "If table has at least that many unfinished mutations, artificially slow down mutations of table. Disabled if set to 0", 0) \
M(UInt64, number_of_mutations_to_throw, 0, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
\
/* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \

View File

@ -1727,18 +1727,30 @@ size_t ReplicatedMergeTreeQueue::countMutations() const
return mutations_by_znode.size();
}
size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
for (const auto & pair : mutations_by_znode)
for (const auto & [_, status] : mutations_by_znode)
{
const auto & mutation = pair.second;
if (!mutation.is_done)
if (!status.is_done)
break;
++count;
}
return count;
}
size_t ReplicatedMergeTreeQueue::countUnfinishedMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
for (const auto & [_, status] : mutations_by_znode | std::views::reverse)
{
if (status.is_done)
break;
++count;
}

View File

@ -386,6 +386,8 @@ public:
/// Count the total number of active mutations that are finished (is_done = true).
size_t countFinishedMutations() const;
/// Count the total number of active mutations that are not finished (is_done = false).
size_t countUnfinishedMutations() const;
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);

View File

@ -3,6 +3,7 @@
#include "Storages/MergeTree/IMergeTreeDataPart.h"
#include <optional>
#include <ranges>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
@ -313,7 +314,11 @@ void StorageMergeTree::alter(
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context);
if (!maybe_mutation_commands.empty())
delayMutationOrThrowIfNeeded(nullptr, local_context);
Int64 mutation_version = -1;
commands.apply(new_metadata, local_context);
@ -321,7 +326,6 @@ void StorageMergeTree::alter(
if (commands.isSettingsAlter())
{
changeSettings(new_metadata.settings_changes, table_lock_holder);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
else
@ -587,11 +591,12 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
delayMutationOrThrowIfNeeded(nullptr, query_context);
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version = startMutation(commands, query_context);
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
waitForMutation(version);
}
@ -1332,6 +1337,24 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
return scheduled;
}
size_t StorageMergeTree::getNumberOfUnfinishedMutations() const
{
size_t count = 0;
for (const auto & [version, _] : current_mutations_by_version | std::views::reverse)
{
auto status = getIncompleteMutationsStatus(version);
if (!status)
continue;
if (status->is_done)
break;
++count;
}
return count;
}
UInt64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) const

View File

@ -113,6 +113,8 @@ public:
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
size_t getNumberOfUnfinishedMutations() const override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
private:

View File

@ -5215,7 +5215,10 @@ void StorageReplicatedMergeTree::alter(
alter_entry->create_time = time(nullptr);
auto maybe_mutation_commands = commands.getMutationCommands(
*current_metadata, query_context->getSettingsRef().materialize_ttl_after_modify, query_context);
*current_metadata,
query_context->getSettingsRef().materialize_ttl_after_modify,
query_context);
bool have_mutation = !maybe_mutation_commands.empty();
alter_entry->have_mutation = have_mutation;
@ -5226,6 +5229,7 @@ void StorageReplicatedMergeTree::alter(
PartitionBlockNumbersHolder partition_block_numbers_holder;
if (have_mutation)
{
delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context);
const String mutations_path(fs::path(zookeeper_path) / "mutations");
ReplicatedMergeTreeMutationEntry mutation_entry;
@ -6406,6 +6410,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted.
delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context);
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = commands;
@ -8036,6 +8042,10 @@ String StorageReplicatedMergeTree::getTableSharedID() const
return toString(table_shared_id);
}
size_t StorageReplicatedMergeTree::getNumberOfUnfinishedMutations() const
{
return queue.countUnfinishedMutations();
}
void StorageReplicatedMergeTree::createTableSharedID() const
{

View File

@ -311,6 +311,8 @@ public:
// Return table id, common for different replicas
String getTableSharedID() const override;
size_t getNumberOfUnfinishedMutations() const override;
/// Returns the same as getTableSharedID(), but extracts it from a create query.
static std::optional<String> tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);

View File

@ -0,0 +1,8 @@
1 2
4
1 6
0
ALTER TABLE t_delay_mutations UPDATE v = 3 WHERE 1; 0 0
ALTER TABLE t_delay_mutations UPDATE v = 4 WHERE 1; 0 0
ALTER TABLE t_delay_mutations UPDATE v = 5 WHERE 1; 1 1
ALTER TABLE t_delay_mutations UPDATE v = 6 WHERE 1; 1 1

View File

@ -0,0 +1,59 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./mergetree_mutations.lib
. "$CURDIR"/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} -n --query "
DROP TABLE IF EXISTS t_delay_mutations SYNC;
CREATE TABLE t_delay_mutations (id UInt64, v UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS
number_of_mutations_to_delay = 2,
number_of_mutations_to_throw = 10,
min_delay_to_mutate_ms = 10,
min_delay_to_mutate_ms = 1000;
SET mutations_sync = 0;
SYSTEM STOP MERGES t_delay_mutations;
INSERT INTO t_delay_mutations VALUES (1, 2);
ALTER TABLE t_delay_mutations UPDATE v = 3 WHERE 1;
ALTER TABLE t_delay_mutations UPDATE v = 4 WHERE 1;
ALTER TABLE t_delay_mutations UPDATE v = 5 WHERE 1;
ALTER TABLE t_delay_mutations UPDATE v = 6 WHERE 1;
SELECT * FROM t_delay_mutations ORDER BY id;
SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_delay_mutations' AND NOT is_done;
"
${CLICKHOUSE_CLIENT} --query "SYSTEM START MERGES t_delay_mutations"
wait_for_mutation "t_delay_mutations" "mutation_5.txt"
${CLICKHOUSE_CLIENT} -n --query "
SELECT * FROM t_delay_mutations ORDER BY id;
SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_delay_mutations' AND NOT is_done;
DROP TABLE IF EXISTS t_delay_mutations SYNC;
"
${CLICKHOUSE_CLIENT} -n --query "
SYSTEM FLUSH LOGS;
SELECT
query,
ProfileEvents['DelayedMutations'],
ProfileEvents['DelayedMutationsMilliseconds'] BETWEEN 10 AND 1000
FROM system.query_log
WHERE
type = 'QueryFinish' AND
current_database = '$CLICKHOUSE_DATABASE' AND
query ILIKE 'ALTER TABLE t_delay_mutations UPDATE%'
ORDER BY query;
"

View File

@ -0,0 +1,9 @@
1 2
2
CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` UInt64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192
1 2
4
CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192
1 6
0
CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192

View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./mergetree_mutations.lib
. "$CURDIR"/mergetree_mutations.lib
function wait_for_alter()
{
type=$1
for i in {1..100}; do
sleep 0.1
${CLICKHOUSE_CLIENT} --query "SHOW CREATE TABLE t_limit_mutations" | grep -q "\`v\` $type" && break;
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for alter to execute"
fi
done
}
${CLICKHOUSE_CLIENT} -n --query "
DROP TABLE IF EXISTS t_limit_mutations SYNC;
CREATE TABLE t_limit_mutations (id UInt64, v UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/t_limit_mutations/', '1') ORDER BY id
SETTINGS number_of_mutations_to_throw = 2;
SET mutations_sync = 0;
SYSTEM STOP MERGES t_limit_mutations;
INSERT INTO t_limit_mutations VALUES (1, 2);
ALTER TABLE t_limit_mutations UPDATE v = 3 WHERE 1;
ALTER TABLE t_limit_mutations UPDATE v = 4 WHERE 1;
ALTER TABLE t_limit_mutations UPDATE v = 5 WHERE 1; -- { serverError TOO_MANY_MUTATIONS }
ALTER TABLE t_limit_mutations MODIFY COLUMN v String; -- { serverError TOO_MANY_MUTATIONS }
SELECT * FROM t_limit_mutations ORDER BY id;
SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done;
SHOW CREATE TABLE t_limit_mutations;
"
${CLICKHOUSE_CLIENT} -n --query "
ALTER TABLE t_limit_mutations UPDATE v = 6 WHERE 1 SETTINGS number_of_mutations_to_throw = 100;
ALTER TABLE t_limit_mutations MODIFY COLUMN v String SETTINGS number_of_mutations_to_throw = 100, alter_sync = 0;
"
wait_for_alter "String"
${CLICKHOUSE_CLIENT} -n --query "
SELECT * FROM t_limit_mutations ORDER BY id;
SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done;
SHOW CREATE TABLE t_limit_mutations;
"
${CLICKHOUSE_CLIENT} --query "SYSTEM START MERGES t_limit_mutations"
wait_for_mutation "t_limit_mutations" "0000000003"
${CLICKHOUSE_CLIENT} -n --query "
SELECT * FROM t_limit_mutations ORDER BY id;
SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done;
SHOW CREATE TABLE t_limit_mutations;
DROP TABLE IF EXISTS t_limit_mutations SYNC;
"