diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index ab0ace7a9a0..2a341b6f1de 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3780,7 +3780,7 @@ std::pair MergeTreeData::getMaxPartsCountAndSizeForPartition() c } -size_t MergeTreeData::getMaxInactivePartsCountForPartition() const +size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const { return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; } @@ -3801,70 +3801,102 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const { const auto settings = getSettings(); const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); + + /// check if have too many parts in total if (parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified " + "with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", + toString(parts_count_in_total)); } - auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); - ssize_t k_inactive = -1; - if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + size_t outdated_parts_over_threshold = 0; { - size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + size_t outdated_parts_count_in_partition = 0; + if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); + + if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( ErrorCodes::TOO_MANY_PARTS, "Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts", - inactive_parts_count_in_partition); + outdated_parts_count_in_partition); } - k_inactive = static_cast(inactive_parts_count_in_partition) - static_cast(settings->inactive_parts_to_delay_insert); + if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert) + outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1; } - auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; - auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; - + auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; - bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts - && average_part_size > settings->max_avg_part_size_for_too_many_parts; - - if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average) + const auto active_parts_to_delay_insert + = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + const auto active_parts_to_throw_insert + = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + size_t active_parts_over_threshold = 0; { - ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception( - ErrorCodes::TOO_MANY_PARTS, - "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", - parts_count_in_partition, ReadableSize(average_part_size)); + bool parts_are_large_enough_in_average + = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; + + if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", + parts_count_in_partition, + ReadableSize(average_part_size)); + } + if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert + && !parts_are_large_enough_in_average) + /// if parts_count == parts_to_delay_insert -> we're 1 part over threshold + active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1; } - if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average)) + /// no need for delay + if (!active_parts_over_threshold && !outdated_parts_over_threshold) return; - const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); - size_t max_k; - size_t k; - if (k_active > k_inactive) + UInt64 delay_milliseconds = 0; { - max_k = parts_to_throw_insert - parts_to_delay_insert; - k = k_active + 1; - } - else - { - max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; - k = k_inactive + 1; - } + size_t parts_over_threshold = 0; + size_t allowed_parts_over_threshold = 1; + const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold); + if (use_active_parts_threshold) + { + parts_over_threshold = active_parts_over_threshold; + allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert; + } + else + { + parts_over_threshold = outdated_parts_over_threshold; + allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay + if (settings->inactive_parts_to_throw_insert > 0) + allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; + } - const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); - /// min() as a save guard here - const UInt64 delay_milliseconds - = std::min(max_delay_milliseconds, static_cast(::pow(max_delay_milliseconds, static_cast(k) / max_k))); + if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) [[unlikely]] + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Incorrect calculation of {} parts over threshold: allowed_parts_over_threshold={}, parts_over_threshold={}", + (use_active_parts_threshold ? "active" : "inactive"), + allowed_parts_over_threshold, + parts_over_threshold); + + const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); + double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; + const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms; + delay_milliseconds = std::max(min_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + } ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 19efd8f908a..e09af181591 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -535,7 +535,7 @@ public: std::pair getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; std::pair getMaxPartsCountAndSizeForPartition() const; - size_t getMaxInactivePartsCountForPartition() const; + size_t getMaxOutdatedPartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. @@ -555,7 +555,7 @@ public: /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 37e9bf5779c..d1f957740e2 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -68,12 +68,13 @@ struct Settings; M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ \ /** Inserts settings. */ \ - M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ + M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ \ /* Part removal settings. */ \ diff --git a/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference b/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference new file mode 100644 index 00000000000..c104ff58aff --- /dev/null +++ b/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference @@ -0,0 +1,6 @@ +0 +300 +500 +750 +1000 +TOO_MANY_PARTS diff --git a/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh b/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh new file mode 100755 index 00000000000..5f91ef19a5a --- /dev/null +++ b/tests/queries/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02521_insert_delay" +# Create MergeTree with settings which allow to insert maximum 5 parts, on 6th it'll throw TOO_MANY_PARTS +$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02521_insert_delay (key UInt32, value String) Engine=MergeTree() ORDER BY tuple() SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=5, max_delay_to_insert=1, min_delay_to_insert_ms=300" +$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay" + +# Every delay is increased by max_delay_to_insert*1000/(parts_to_throw_insert - parts_to_delay_insert + 1), here it's 250ms +# 0-indexed INSERT - no delay, 1-indexed INSERT - 300ms instead of 250ms due to min_delay_to_insert_ms +for i in {0..4} +do + query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM" + $CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, toString(number) FROM numbers(${i}, 1)" + $CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS" + $CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and query_id = {query_id:String} order by delay desc limit 1" +done + +$CLICKHOUSE_CLIENT -q "INSERT INTO test_02521_insert_delay VALUES(0, 'This query throws error')" 2>&1 | grep -o 'TOO_MANY_PARTS' | head -n 1 + +$CLICKHOUSE_CLIENT -q "DROP TABLE test_02521_insert_delay"