Fix: insert delay calculation

This commit is contained in:
Igor Nikonov 2023-01-05 22:43:41 +00:00
parent 078f4d947a
commit a67afdff6a
2 changed files with 65 additions and 42 deletions

View File

@ -3701,7 +3701,7 @@ std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartition() c
} }
size_t MergeTreeData::getMaxInactivePartsCountForPartition() const size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const
{ {
return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first;
} }
@ -3722,70 +3722,93 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
} }
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
{ {
const auto settings = getSettings(); const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef(); const auto & query_settings = query_context->getSettingsRef();
const size_t parts_count_in_total = getPartsCount(); const size_t parts_count_in_total = getPartsCount();
/// check if have too many parts in total
if (parts_count_in_total >= settings->max_parts_in_total) if (parts_count_in_total >= settings->max_parts_in_total)
{ {
ProfileEvents::increment(ProfileEvents::RejectedInserts); ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified "
"with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.",
toString(parts_count_in_total));
} }
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); size_t outdated_parts_over_threshold = [&]() -> size_t
ssize_t k_inactive = -1;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
{ {
size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition(); size_t outdated_parts_count_in_partition = 0;
if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
{ {
ProfileEvents::increment(ProfileEvents::RejectedInserts); ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception( throw Exception(
ErrorCodes::TOO_MANY_PARTS, ErrorCodes::TOO_MANY_PARTS,
"Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts", "Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts",
inactive_parts_count_in_partition); outdated_parts_count_in_partition);
} }
k_inactive = static_cast<ssize_t>(inactive_parts_count_in_partition) - static_cast<ssize_t>(settings->inactive_parts_to_delay_insert); if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert)
} return outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1;
auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; return 0;
auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; }();
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0;
bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts const auto active_parts_to_delay_insert
&& average_part_size > settings->max_avg_part_size_for_too_many_parts; = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
const auto active_parts_to_throw_insert
if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average) = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
size_t active_parts_over_threshold = [&](size_t parts_count) -> size_t
{ {
ProfileEvents::increment(ProfileEvents::RejectedInserts); bool parts_are_large_enough_in_average
throw Exception( = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition, ReadableSize(average_part_size));
}
if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average)) if (parts_count >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count,
ReadableSize(average_part_size));
}
if (active_parts_to_delay_insert > 0 && parts_count >= active_parts_to_delay_insert && !parts_are_large_enough_in_average)
/// if parts_count == parts_to_delay_insert -> we're 1 part over threshold
return parts_count - active_parts_to_delay_insert + 1;
return 0;
}(parts_count_in_partition);
/// no need for delay
if (!active_parts_over_threshold && !outdated_parts_over_threshold)
return; return;
const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); const UInt64 delay_milliseconds = [&]() -> UInt64
size_t max_k;
size_t k;
if (k_active > k_inactive)
{ {
max_k = parts_to_throw_insert - parts_to_delay_insert; size_t parts_over_threshold = std::max(active_parts_over_threshold, outdated_parts_over_threshold);
k = k_active + 1; size_t allowed_parts_over_threshold = 1;
} if (active_parts_over_threshold >= outdated_parts_over_threshold)
else allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert;
{ else
max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; allowed_parts_over_threshold
k = k_inactive + 1; = (settings->inactive_parts_to_throw_insert > 0
} ? settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert
: outdated_parts_over_threshold);
const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); chassert(parts_over_threshold <= allowed_parts_over_threshold);
/// min() as a save guard here
const UInt64 delay_milliseconds const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
= std::min(max_delay_milliseconds, static_cast<UInt64>(::pow(max_delay_milliseconds, static_cast<double>(k) / max_k))); double delay_factor = static_cast<double>(parts_over_threshold) / allowed_parts_over_threshold;
/// min() as a save guard here
return std::min(max_delay_milliseconds, static_cast<UInt64>(max_delay_milliseconds * delay_factor));
}();
ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);

View File

@ -533,7 +533,7 @@ public:
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const; std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;
size_t getMaxInactivePartsCountForPartition() const; size_t getMaxOutdatedPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts. /// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
@ -553,7 +553,7 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge. /// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened. /// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const; void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// Renames temporary part to a permanent part and adds it to the parts set. /// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts. /// It is assumed that the part does not intersect with existing parts.