Relax "too many parts" further

This commit is contained in:
Alexey Milovidov 2023-06-11 17:51:54 +02:00
parent eb698a7cdd
commit 598501011f
9 changed files with 26 additions and 16 deletions

View File

@ -1705,7 +1705,6 @@ try
#endif
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
{

View File

@ -4,7 +4,6 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/HashTable/Hash.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <base/terminalColors.h>

View File

@ -4315,14 +4315,14 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
}
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const
{
const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef();
const size_t parts_count_in_total = getActivePartsCount();
/// check if have too many parts in total
if (parts_count_in_total >= settings->max_parts_in_total)
/// Check if we have too many parts in total
if (allow_throw && parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
@ -4338,7 +4338,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
if (allow_throw && settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
@ -4362,7 +4362,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
bool parts_are_large_enough_in_average
= settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;
if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
if (allow_throw && parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(

View File

@ -557,7 +557,7 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
/// The decision to delay or throw is made according to settings 'parts_to_delay_insert' and 'parts_to_throw_insert'.
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const;
/// If the table contains too many unfinished mutations, sleep for a while to give them time to execute.
/// If until is non-null, wake up from the sleep earlier if the event happened.

View File

@ -73,11 +73,11 @@ struct Settings;
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, parts_to_throw_insert, 3000, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \
M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \
M(UInt64, max_avg_part_size_for_too_many_parts, 1ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \

View File

@ -45,9 +45,9 @@ MergeTreeSink::MergeTreeSink(
void MergeTreeSink::onStart()
{
/// Only check "too many parts" before write,
/// It's only allowed to throw "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded(nullptr, context);
storage.delayInsertOrThrowIfNeeded(nullptr, context, true);
}
void MergeTreeSink::onFinish()
@ -57,6 +57,9 @@ void MergeTreeSink::onFinish()
void MergeTreeSink::consume(Chunk chunk)
{
if (num_blocks_processed > 0)
storage.delayInsertOrThrowIfNeeded(nullptr, context, false);
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
@ -136,6 +139,8 @@ void MergeTreeSink::consume(Chunk chunk)
finishDelayedChunk();
delayed_chunk = std::make_unique<MergeTreeSink::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
++num_blocks_processed;
}
void MergeTreeSink::finishDelayedChunk()

View File

@ -35,7 +35,8 @@ private:
size_t max_parts_per_block;
ContextPtr context;
StorageSnapshotPtr storage_snapshot;
uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
UInt64 num_blocks_processed = 0;
/// We can delay processing for previous chunk and start writing a new one.
struct DelayedChunk;

View File

@ -367,6 +367,9 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
{
if (num_blocks_processed > 0)
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, false);
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
const auto & settings = context->getSettingsRef();
@ -512,6 +515,8 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
/// TODO: we can also delay commit if there is no MVs.
if (!settings.deduplicate_blocks_in_dependent_materialized_views)
finishDelayedChunk(zookeeper);
++num_blocks_processed;
}
template<>
@ -1136,9 +1141,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
{
/// Only check "too many parts" before write,
/// It's only allowed to throw "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context);
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, true);
}
template<bool async_insert>

View File

@ -123,6 +123,7 @@ private:
bool quorum_parallel = false;
const bool deduplicate = true;
bool last_block_is_duplicate = false;
UInt64 num_blocks_processed = 0;
using Logger = Poco::Logger;
Poco::Logger * log;