mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #17566 from ClickHouse/relax-too-many-parts-1
Relax "Too many parts" threshold, part 1
This commit is contained in:
commit
dd78c9dc75
@ -13,10 +13,16 @@ Block MergeTreeBlockOutputStream::getHeader() const
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBlockOutputStream::writePrefix()
|
||||
{
|
||||
/// Only check "too many parts" before write,
|
||||
/// because interrupting long-running INSERT query in the middle is not convenient for users.
|
||||
storage.delayInsertOrThrowIfNeeded();
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
storage.delayInsertOrThrowIfNeeded();
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
|
||||
Block getHeader() const override;
|
||||
void write(const Block & block) override;
|
||||
void writePrefix() override;
|
||||
|
||||
private:
|
||||
StorageMergeTree & storage;
|
||||
|
@ -2380,8 +2380,6 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
|
||||
}
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
if (parts_count_in_partition < settings->parts_to_delay_insert)
|
||||
return;
|
||||
|
||||
if (parts_count_in_partition >= settings->parts_to_throw_insert)
|
||||
{
|
||||
@ -2389,6 +2387,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
if (parts_count_in_partition < settings->parts_to_delay_insert)
|
||||
return;
|
||||
|
||||
const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0
|
||||
const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k
|
||||
const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
|
||||
@ -2406,24 +2407,6 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
|
||||
}
|
||||
|
||||
void MergeTreeData::throwInsertIfNeeded() const
|
||||
{
|
||||
const auto settings = getSettings();
|
||||
const size_t parts_count_in_total = getPartsCount();
|
||||
if (parts_count_in_total >= settings->max_parts_in_total)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
|
||||
if (parts_count_in_partition >= settings->parts_to_throw_insert)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
|
||||
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
|
||||
@ -2451,6 +2434,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
|
||||
{
|
||||
auto lock = lockParts();
|
||||
|
@ -423,7 +423,6 @@ public:
|
||||
/// If the table contains too many active parts, sleep for a while to give them time to merge.
|
||||
/// If until is non-null, wake up from the sleep earlier if the event happened.
|
||||
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
|
||||
void throwInsertIfNeeded() const;
|
||||
|
||||
/// Renames temporary part to a permanent part and adds it to the parts set.
|
||||
/// It is assumed that the part does not intersect with existing parts.
|
||||
|
@ -123,8 +123,6 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
|
||||
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
assertSessionIsNotExpired(zookeeper);
|
||||
|
||||
@ -524,7 +522,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
|
||||
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
|
||||
{
|
||||
storage.throwInsertIfNeeded();
|
||||
/// Only check "too many parts" before write,
|
||||
/// because interrupting long-running INSERT query in the middle is not convenient for users.
|
||||
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
10
|
@ -0,0 +1,14 @@
|
||||
DROP TABLE IF EXISTS too_many_parts;
|
||||
CREATE TABLE too_many_parts (x UInt64) ENGINE = MergeTree ORDER BY tuple() SETTINGS parts_to_delay_insert = 5, parts_to_throw_insert = 5;
|
||||
|
||||
SYSTEM STOP MERGES too_many_parts;
|
||||
SET max_block_size = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0;
|
||||
|
||||
-- exception is not thrown if threshold is exceeded when multi-block INSERT is already started.
|
||||
INSERT INTO too_many_parts SELECT * FROM numbers(10);
|
||||
SELECT count() FROM too_many_parts;
|
||||
|
||||
-- exception is thrown if threshold is exceeded on new INSERT.
|
||||
INSERT INTO too_many_parts SELECT * FROM numbers(10); -- { serverError 252 }
|
||||
|
||||
DROP TABLE too_many_parts;
|
Loading…
Reference in New Issue
Block a user