diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 453f680c6ba..a9397a052a6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2133,6 +2133,18 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const } +size_t MergeTreeData::getPartsCount() const +{ + auto lock = lockParts(); + + size_t res = 0; + for (const auto & part [[maybe_unused]] : getDataPartsStateRange(DataPartState::Committed)) + ++res; + + return res; +} + + size_t MergeTreeData::getMaxPartsCountForPartition() const { auto lock = lockParts(); @@ -2165,7 +2177,7 @@ std::optional MergeTreeData::getMinPartDataVersion() const auto lock = lockParts(); std::optional result; - for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed)) + for (const auto & part : getDataPartsStateRange(DataPartState::Committed)) { if (!result || *result > part->info.getDataVersion()) result = part->info.getDataVersion(); @@ -2177,18 +2189,25 @@ std::optional MergeTreeData::getMinPartDataVersion() const void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const { - const size_t parts_count = getMaxPartsCountForPartition(); - if (parts_count < settings.parts_to_delay_insert) - return; - - if (parts_count >= settings.parts_to_throw_insert) + const size_t parts_count_in_total = getPartsCount(); + if (parts_count_in_total >= settings.max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); + throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); + } + + const size_t parts_count_in_partition = getMaxPartsCountForPartition(); + if (parts_count_in_partition < settings.parts_to_delay_insert) + return; + + if (parts_count_in_partition >= settings.parts_to_throw_insert) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); } const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0 - const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k + const size_t k = 1 + parts_count_in_partition - settings.parts_to_delay_insert; /// from 1 to max_k const double delay_milliseconds = ::pow(settings.max_delay_to_insert * 1000, static_cast(k) / max_k); ProfileEvents::increment(ProfileEvents::DelayedInserts); @@ -2197,7 +2216,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts); LOG_INFO(log, "Delaying inserting block by " - << std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count << " parts"); + << std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count_in_partition << " parts"); if (until) until->tryWait(delay_milliseconds); @@ -2207,12 +2226,19 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const void MergeTreeData::throwInsertIfNeeded() const { - const size_t parts_count = getMaxPartsCountForPartition(); - - if (parts_count >= settings.parts_to_throw_insert) + const size_t parts_count_in_total = getPartsCount(); + if (parts_count_in_total >= settings.max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); + throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS); + } + + const size_t parts_count_in_partition = getMaxPartsCountForPartition(); + + if (parts_count_in_partition >= settings.parts_to_throw_insert) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 34ad47cbaba..4dc76d4f9c3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -422,6 +422,7 @@ public: /// Total size of active parts in bytes. size_t getTotalActiveSizeInBytes() const; + size_t getPartsCount() const; size_t getMaxPartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 5deaa0816b4..06bfad495d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -36,9 +36,10 @@ struct MergeTreeSettings : public SettingsCollection M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \ \ /** Inserts settings. */ \ - M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts, artificially slow down insert into table.") \ - M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts, throw 'Too many parts ...' exception.") \ - M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.") \ + M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.") \ + M(SettingUInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.") \ + M(SettingUInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.") \ + M(SettingUInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.") \ \ /** Replication settings. */ \ M(SettingUInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).") \ diff --git a/dbms/tests/queries/0_stateless/00940_max_parts_in_total.reference b/dbms/tests/queries/0_stateless/00940_max_parts_in_total.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00940_max_parts_in_total.reference @@ -0,0 +1 @@ +1 diff --git a/dbms/tests/queries/0_stateless/00940_max_parts_in_total.sql b/dbms/tests/queries/0_stateless/00940_max_parts_in_total.sql new file mode 100644 index 00000000000..b6d0e6f53ae --- /dev/null +++ b/dbms/tests/queries/0_stateless/00940_max_parts_in_total.sql @@ -0,0 +1,8 @@ +drop table if exists max_parts_in_total; +create table max_parts_in_total (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x SETTINGS max_parts_in_total = 10; + +INSERT INTO max_parts_in_total SELECT number FROM numbers(10); +SELECT 1; +INSERT INTO max_parts_in_total SELECT 123; -- { serverError 252 } + +drop table max_parts_in_total;