Relax "Too many parts" threshold, part 1

This commit is contained in:
Alexey Milovidov 2020-11-29 18:08:02 +03:00
parent 650d20fdeb
commit 2e6bedce91
5 changed files with 13 additions and 24 deletions

View File

@ -13,10 +13,16 @@ Block MergeTreeBlockOutputStream::getHeader() const
} }
void MergeTreeBlockOutputStream::writePrefix()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded();
}
void MergeTreeBlockOutputStream::write(const Block & block) void MergeTreeBlockOutputStream::write(const Block & block)
{ {
storage.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
for (auto & current_block : part_blocks) for (auto & current_block : part_blocks)
{ {

View File

@ -23,6 +23,7 @@ public:
Block getHeader() const override; Block getHeader() const override;
void write(const Block & block) override; void write(const Block & block) override;
void writePrefix() override;
private: private:
StorageMergeTree & storage; StorageMergeTree & storage;

View File

@ -2406,24 +2406,6 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds))); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
} }
void MergeTreeData::throwInsertIfNeeded() const
{
const auto settings = getSettings();
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
}
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
if (parts_count_in_partition >= settings->parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
@ -2451,6 +2433,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return nullptr; return nullptr;
} }
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
{ {
auto lock = lockParts(); auto lock = lockParts();

View File

@ -422,7 +422,6 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge. /// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened. /// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
void throwInsertIfNeeded() const;
/// Renames temporary part to a permanent part and adds it to the parts set. /// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts. /// It is assumed that the part does not intersect with existing parts.

View File

@ -121,8 +121,6 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
{ {
last_block_is_duplicate = false; last_block_is_duplicate = false;
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper); assertSessionIsNotExpired(zookeeper);
@ -522,7 +520,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
void ReplicatedMergeTreeBlockOutputStream::writePrefix() void ReplicatedMergeTreeBlockOutputStream::writePrefix()
{ {
storage.throwInsertIfNeeded(); /// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
} }