Merge pull request #44954 from ClickHouse/fix-insert-delay

Fix: insert delay calculation
This commit is contained in:
Igor Nikonov 2023-01-12 09:43:00 +01:00 committed by GitHub
commit edbcb24da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 42 deletions

View File

@ -3780,7 +3780,7 @@ std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartition() c
}
size_t MergeTreeData::getMaxInactivePartsCountForPartition() const
size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const
{
return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first;
}
@ -3801,70 +3801,102 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
}
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
{
const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef();
const size_t parts_count_in_total = getPartsCount();
/// check if have too many parts in total
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified "
"with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.",
toString(parts_count_in_total));
}
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
ssize_t k_inactive = -1;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
size_t outdated_parts_over_threshold = 0;
{
size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
size_t outdated_parts_count_in_partition = 0;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts",
inactive_parts_count_in_partition);
outdated_parts_count_in_partition);
}
k_inactive = static_cast<ssize_t>(inactive_parts_count_in_partition) - static_cast<ssize_t>(settings->inactive_parts_to_delay_insert);
if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert)
outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1;
}
auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0;
bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts
&& average_part_size > settings->max_avg_part_size_for_too_many_parts;
if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average)
const auto active_parts_to_delay_insert
= query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
const auto active_parts_to_throw_insert
= query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
size_t active_parts_over_threshold = 0;
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition, ReadableSize(average_part_size));
bool parts_are_large_enough_in_average
= settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;
if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition,
ReadableSize(average_part_size));
}
if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert
&& !parts_are_large_enough_in_average)
/// if parts_count == parts_to_delay_insert -> we're 1 part over threshold
active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1;
}
if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average))
/// no need for delay
if (!active_parts_over_threshold && !outdated_parts_over_threshold)
return;
const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert);
size_t max_k;
size_t k;
if (k_active > k_inactive)
UInt64 delay_milliseconds = 0;
{
max_k = parts_to_throw_insert - parts_to_delay_insert;
k = k_active + 1;
}
else
{
max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
k = k_inactive + 1;
}
size_t parts_over_threshold = 0;
size_t allowed_parts_over_threshold = 1;
const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold);
if (use_active_parts_threshold)
{
parts_over_threshold = active_parts_over_threshold;
allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert;
}
else
{
parts_over_threshold = outdated_parts_over_threshold;
allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay
if (settings->inactive_parts_to_throw_insert > 0)
allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
}
const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
/// min() as a save guard here
const UInt64 delay_milliseconds
= std::min(max_delay_milliseconds, static_cast<UInt64>(::pow(max_delay_milliseconds, static_cast<double>(k) / max_k)));
if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) [[unlikely]]
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect calculation of {} parts over threshold: allowed_parts_over_threshold={}, parts_over_threshold={}",
(use_active_parts_threshold ? "active" : "inactive"),
allowed_parts_over_threshold,
parts_over_threshold);
const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
double delay_factor = static_cast<double>(parts_over_threshold) / allowed_parts_over_threshold;
const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms;
delay_milliseconds = std::max(min_delay_milliseconds, static_cast<UInt64>(max_delay_milliseconds * delay_factor));
}
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);

View File

@ -535,7 +535,7 @@ public:
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;
size_t getMaxInactivePartsCountForPartition() const;
size_t getMaxOutdatedPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
@ -555,7 +555,7 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const;
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.

View File

@ -68,12 +68,13 @@ struct Settings;
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \
M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
\
/* Part removal settings. */ \

View File

@ -0,0 +1,6 @@
0
300
500
750
1000
TOO_MANY_PARTS

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02521_insert_delay"
# Create MergeTree with settings which allow to insert maximum 5 parts, on 6th it'll throw TOO_MANY_PARTS
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02521_insert_delay (key UInt32, value String) Engine=MergeTree() ORDER BY tuple() SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=5, max_delay_to_insert=1, min_delay_to_insert_ms=300"
$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay"
# Every delay is increased by max_delay_to_insert*1000/(parts_to_throw_insert - parts_to_delay_insert + 1), here it's 250ms
# 0-indexed INSERT - no delay, 1-indexed INSERT - 300ms instead of 250ms due to min_delay_to_insert_ms
for i in {0..4}
do
query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, toString(number) FROM numbers(${i}, 1)"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and query_id = {query_id:String} order by delay desc limit 1"
done
$CLICKHOUSE_CLIENT -q "INSERT INTO test_02521_insert_delay VALUES(0, 'This query throws error')" 2>&1 | grep -o 'TOO_MANY_PARTS' | head -n 1
$CLICKHOUSE_CLIENT -q "DROP TABLE test_02521_insert_delay"