Merge pull request #17566 from ClickHouse/relax-too-many-parts-1

Relax "Too many parts" threshold, part 1
This commit is contained in:
alexey-milovidov 2020-12-17 20:44:44 +03:00 committed by GitHub
commit dd78c9dc75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 31 additions and 26 deletions

View File

@ -13,10 +13,16 @@ Block MergeTreeBlockOutputStream::getHeader() const
}
void MergeTreeBlockOutputStream::writePrefix()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded();
}
void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
for (auto & current_block : part_blocks)
{

View File

@ -24,6 +24,7 @@ public:
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
private:
StorageMergeTree & storage;

View File

@ -2380,8 +2380,6 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition < settings->parts_to_delay_insert)
return;
if (parts_count_in_partition >= settings->parts_to_throw_insert)
{
@ -2389,6 +2387,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
if (parts_count_in_partition < settings->parts_to_delay_insert)
return;
const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k
const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
@ -2406,24 +2407,6 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
}
void MergeTreeData::throwInsertIfNeeded() const
{
const auto settings = getSettings();
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition >= settings->parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
@ -2451,6 +2434,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return nullptr;
}
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
{
auto lock = lockParts();

View File

@ -423,7 +423,6 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
void throwInsertIfNeeded() const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.

View File

@ -123,8 +123,6 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
{
last_block_is_duplicate = false;
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
@ -524,7 +522,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
{
storage.throwInsertIfNeeded();
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
}

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS too_many_parts;
CREATE TABLE too_many_parts (x UInt64) ENGINE = MergeTree ORDER BY tuple() SETTINGS parts_to_delay_insert = 5, parts_to_throw_insert = 5;
SYSTEM STOP MERGES too_many_parts;
SET max_block_size = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0;
-- exception is not thrown if threshold is exceeded when multi-block INSERT is already started.
INSERT INTO too_many_parts SELECT * FROM numbers(10);
SELECT count() FROM too_many_parts;
-- exception is thrown if threshold is exceeded on new INSERT.
INSERT INTO too_many_parts SELECT * FROM numbers(10); -- { serverError 252 }
DROP TABLE too_many_parts;