mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Added max_parts_in_total threshold to MergeTree tables
This commit is contained in:
parent
478b721b36
commit
6a271adad3
@ -2133,6 +2133,18 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeData::getPartsCount() const
|
||||
{
|
||||
auto lock = lockParts();
|
||||
|
||||
size_t res = 0;
|
||||
for (const auto & part [[maybe_unused]] : getDataPartsStateRange(DataPartState::Committed))
|
||||
++res;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeData::getMaxPartsCountForPartition() const
|
||||
{
|
||||
auto lock = lockParts();
|
||||
@ -2165,7 +2177,7 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
|
||||
auto lock = lockParts();
|
||||
|
||||
std::optional<Int64> result;
|
||||
for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed))
|
||||
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||
{
|
||||
if (!result || *result > part->info.getDataVersion())
|
||||
result = part->info.getDataVersion();
|
||||
@ -2177,18 +2189,25 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
|
||||
|
||||
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
|
||||
{
|
||||
const size_t parts_count = getMaxPartsCountForPartition();
|
||||
if (parts_count < settings.parts_to_delay_insert)
|
||||
return;
|
||||
|
||||
if (parts_count >= settings.parts_to_throw_insert)
|
||||
const size_t parts_count_in_total = getPartsCount();
|
||||
if (parts_count_in_total >= settings.max_parts_in_total)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
if (parts_count_in_partition < settings.parts_to_delay_insert)
|
||||
return;
|
||||
|
||||
if (parts_count_in_partition >= settings.parts_to_throw_insert)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
|
||||
const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k
|
||||
const size_t k = 1 + parts_count_in_partition - settings.parts_to_delay_insert; /// from 1 to max_k
|
||||
const double delay_milliseconds = ::pow(settings.max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DelayedInserts);
|
||||
@ -2197,7 +2216,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts);
|
||||
|
||||
LOG_INFO(log, "Delaying inserting block by "
|
||||
<< std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count << " parts");
|
||||
<< std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count_in_partition << " parts");
|
||||
|
||||
if (until)
|
||||
until->tryWait(delay_milliseconds);
|
||||
@ -2207,12 +2226,19 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
|
||||
|
||||
void MergeTreeData::throwInsertIfNeeded() const
|
||||
{
|
||||
const size_t parts_count = getMaxPartsCountForPartition();
|
||||
|
||||
if (parts_count >= settings.parts_to_throw_insert)
|
||||
const size_t parts_count_in_total = getPartsCount();
|
||||
if (parts_count_in_total >= settings.max_parts_in_total)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
|
||||
if (parts_count_in_partition >= settings.parts_to_throw_insert)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -422,6 +422,7 @@ public:
|
||||
/// Total size of active parts in bytes.
|
||||
size_t getTotalActiveSizeInBytes() const;
|
||||
|
||||
size_t getPartsCount() const;
|
||||
size_t getMaxPartsCountForPartition() const;
|
||||
|
||||
/// Get min value of part->info.getDataVersion() for all active parts.
|
||||
|
@ -36,9 +36,10 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \
|
||||
\
|
||||
/** Inserts settings. */ \
|
||||
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts, artificially slow down insert into table.") \
|
||||
M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts, throw 'Too many parts ...' exception.") \
|
||||
M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.") \
|
||||
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.") \
|
||||
M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.") \
|
||||
M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.") \
|
||||
M(SettingUInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.") \
|
||||
\
|
||||
/** Replication settings. */ \
|
||||
M(SettingUInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).") \
|
||||
|
@ -0,0 +1 @@
|
||||
1
|
@ -0,0 +1,8 @@
|
||||
drop table if exists max_parts_in_total;
|
||||
create table max_parts_in_total (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS max_parts_in_total = 10;
|
||||
|
||||
INSERT INTO max_parts_in_total SELECT number FROM numbers(10);
|
||||
SELECT 1;
|
||||
INSERT INTO max_parts_in_total SELECT 123; -- { serverError 252 }
|
||||
|
||||
drop table max_parts_in_total;
|
Loading…
Reference in New Issue
Block a user