Merge pull request #5171 from yandex/max_parts_in_total

Added max_parts_in_total threshold to MergeTree tables
This commit is contained in:
alexey-milovidov 2019-05-02 19:27:32 +03:00 committed by GitHub
commit bb970dd419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 16 deletions

View File

@ -2133,6 +2133,18 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
}
size_t MergeTreeData::getPartsCount() const
{
auto lock = lockParts();
size_t res = 0;
for (const auto & part [[maybe_unused]] : getDataPartsStateRange(DataPartState::Committed))
++res;
return res;
}
size_t MergeTreeData::getMaxPartsCountForPartition() const
{
auto lock = lockParts();
@ -2165,7 +2177,7 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
auto lock = lockParts();
std::optional<Int64> result;
for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed))
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!result || *result > part->info.getDataVersion())
result = part->info.getDataVersion();
@ -2177,18 +2189,25 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
{
const size_t parts_count = getMaxPartsCountForPartition();
if (parts_count < settings.parts_to_delay_insert)
return;
if (parts_count >= settings.parts_to_throw_insert)
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings.max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition < settings.parts_to_delay_insert)
return;
if (parts_count_in_partition >= settings.parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k
const size_t k = 1 + parts_count_in_partition - settings.parts_to_delay_insert; /// from 1 to max_k
const double delay_milliseconds = ::pow(settings.max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
ProfileEvents::increment(ProfileEvents::DelayedInserts);
@ -2197,7 +2216,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count << " parts");
<< std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count_in_partition << " parts");
if (until)
until->tryWait(delay_milliseconds);
@ -2207,12 +2226,19 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
void MergeTreeData::throwInsertIfNeeded() const
{
const size_t parts_count = getMaxPartsCountForPartition();
if (parts_count >= settings.parts_to_throw_insert)
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings.max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition >= settings.parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
}

View File

@ -422,6 +422,7 @@ public:
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;
size_t getPartsCount() const;
size_t getMaxPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.

View File

@ -36,9 +36,10 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \
\
/** Inserts settings. */ \
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts, artificially slow down insert into table.") \
M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts, throw 'Too many parts ...' exception.") \
M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.") \
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.") \
M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.") \
M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.") \
M(SettingUInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.") \
\
/** Replication settings. */ \
M(SettingUInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).") \

View File

@ -0,0 +1,8 @@
drop table if exists max_parts_in_total;
create table max_parts_in_total (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS max_parts_in_total = 10;
INSERT INTO max_parts_in_total SELECT number FROM numbers(10);
SELECT 1;
INSERT INTO max_parts_in_total SELECT 123; -- { serverError 252 }
drop table max_parts_in_total;