diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 30d0570ff11..89d90011398 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3701,7 +3701,7 @@ std::pair MergeTreeData::getMaxPartsCountAndSizeForPartition() c } -size_t MergeTreeData::getMaxInactivePartsCountForPartition() const +size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const { return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; } @@ -3722,70 +3722,93 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const { const auto settings = getSettings(); const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); + + /// check if have too many parts in total if (parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified " + "with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", + toString(parts_count_in_total)); } - auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); - ssize_t k_inactive = -1; - if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + size_t outdated_parts_over_threshold = [&]() -> size_t { - size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + size_t outdated_parts_count_in_partition = 0; + if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); + + if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( ErrorCodes::TOO_MANY_PARTS, "Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts", - inactive_parts_count_in_partition); + outdated_parts_count_in_partition); } - k_inactive = static_cast(inactive_parts_count_in_partition) - static_cast(settings->inactive_parts_to_delay_insert); - } + if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert) + return outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1; - auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; - auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + return 0; + }(); + auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; - bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts - && average_part_size > settings->max_avg_part_size_for_too_many_parts; - - if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average) + const auto active_parts_to_delay_insert + = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + const auto active_parts_to_throw_insert + = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + size_t active_parts_over_threshold = [&](size_t parts_count) -> size_t { - ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception( - ErrorCodes::TOO_MANY_PARTS, - "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", - parts_count_in_partition, ReadableSize(average_part_size)); - } + bool parts_are_large_enough_in_average + = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; - if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average)) + if (parts_count >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", + parts_count, + ReadableSize(average_part_size)); + } + if (active_parts_to_delay_insert > 0 && parts_count >= active_parts_to_delay_insert && !parts_are_large_enough_in_average) + /// if parts_count == parts_to_delay_insert -> we're 1 part over threshold + return parts_count - active_parts_to_delay_insert + 1; + + return 0; + }(parts_count_in_partition); + + /// no need for delay + if (!active_parts_over_threshold && !outdated_parts_over_threshold) return; - const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); - size_t max_k; - size_t k; - if (k_active > k_inactive) + const UInt64 delay_milliseconds = [&]() -> UInt64 { - max_k = parts_to_throw_insert - parts_to_delay_insert; - k = k_active + 1; - } - else - { - max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; - k = k_inactive + 1; - } + size_t parts_over_threshold = std::max(active_parts_over_threshold, outdated_parts_over_threshold); + size_t allowed_parts_over_threshold = 1; + if (active_parts_over_threshold >= outdated_parts_over_threshold) + allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert; + else + allowed_parts_over_threshold + = (settings->inactive_parts_to_throw_insert > 0 + ? settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert + : outdated_parts_over_threshold); - const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); - /// min() as a save guard here - const UInt64 delay_milliseconds - = std::min(max_delay_milliseconds, static_cast(::pow(max_delay_milliseconds, static_cast(k) / max_k))); + chassert(parts_over_threshold <= allowed_parts_over_threshold); + + const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); + double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; + /// min() as a save guard here + return std::min(max_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + }(); ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 670c755cf72..f846ba5e184 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -533,7 +533,7 @@ public: std::pair getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; std::pair getMaxPartsCountAndSizeForPartition() const; - size_t getMaxInactivePartsCountForPartition() const; + size_t getMaxOutdatedPartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. @@ -553,7 +553,7 @@ public: /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts.