diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d0fc8aca5e8..cfef7f0a94a 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1705,7 +1705,6 @@ try #endif /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. - async_metrics.start(); { diff --git a/src/Loggers/OwnPatternFormatter.cpp b/src/Loggers/OwnPatternFormatter.cpp index ccf6c479b80..0c2256aaa1b 100644 --- a/src/Loggers/OwnPatternFormatter.cpp +++ b/src/Loggers/OwnPatternFormatter.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9cca471fddb..b42d130bf62 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4315,14 +4315,14 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const { const auto settings = getSettings(); const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getActivePartsCount(); - /// check if have too many parts in total - if (parts_count_in_total >= settings->max_parts_in_total) + /// Check if we have too many parts in total + if (allow_throw && parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -4338,7 +4338,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + if (allow_throw && settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -4362,7 +4362,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; - if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + if (allow_throw && parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index b1e1e43bd0b..ebda82eeaed 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -557,7 +557,7 @@ public: /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. /// The decision to delay or throw is made according to settings 'parts_to_delay_insert' and 'parts_to_throw_insert'. - void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const; /// If the table contains too many unfinished mutations, sleep for a while to give them time to execute. /// If until is non-null, wake up from the sleep earlier if the event happened. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 33aea358078..082b84be575 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -73,11 +73,11 @@ struct Settings; M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ \ /** Inserts settings. */ \ - M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ + M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ - M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ + M(UInt64, parts_to_throw_insert, 3000, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ - M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ + M(UInt64, max_avg_part_size_for_too_many_parts, 1ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index d62fe5024f4..36816904a81 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -45,9 +45,9 @@ MergeTreeSink::MergeTreeSink( void MergeTreeSink::onStart() { - /// Only check "too many parts" before write, + /// It's only allowed to throw "too many parts" before write, /// because interrupting long-running INSERT query in the middle is not convenient for users. - storage.delayInsertOrThrowIfNeeded(nullptr, context); + storage.delayInsertOrThrowIfNeeded(nullptr, context, true); } void MergeTreeSink::onFinish() @@ -57,6 +57,9 @@ void MergeTreeSink::onFinish() void MergeTreeSink::consume(Chunk chunk) { + if (num_blocks_processed > 0) + storage.delayInsertOrThrowIfNeeded(nullptr, context, false); + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); if (!storage_snapshot->object_columns.empty()) convertDynamicColumnsToTuples(block, storage_snapshot); @@ -136,6 +139,8 @@ void MergeTreeSink::consume(Chunk chunk) finishDelayedChunk(); delayed_chunk = std::make_unique(); delayed_chunk->partitions = std::move(partitions); + + ++num_blocks_processed; } void MergeTreeSink::finishDelayedChunk() diff --git a/src/Storages/MergeTree/MergeTreeSink.h b/src/Storages/MergeTree/MergeTreeSink.h index 68f11d86a25..07ab3850df2 100644 --- a/src/Storages/MergeTree/MergeTreeSink.h +++ b/src/Storages/MergeTree/MergeTreeSink.h @@ -35,7 +35,8 @@ private: size_t max_parts_per_block; ContextPtr context; StorageSnapshotPtr storage_snapshot; - uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 num_blocks_processed = 0; /// We can delay processing for previous chunk and start writing a new one. struct DelayedChunk; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 28dad454afe..5fbd72ccddc 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -367,6 +367,9 @@ size_t ReplicatedMergeTreeSinkImpl::checkQuorumPrecondition(const template void ReplicatedMergeTreeSinkImpl::consume(Chunk chunk) { + if (num_blocks_processed > 0) + storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, false); + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); const auto & settings = context->getSettingsRef(); @@ -512,6 +515,8 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk chunk) /// TODO: we can also delay commit if there is no MVs. if (!settings.deduplicate_blocks_in_dependent_materialized_views) finishDelayedChunk(zookeeper); + + ++num_blocks_processed; } template<> @@ -1136,9 +1141,9 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: template void ReplicatedMergeTreeSinkImpl::onStart() { - /// Only check "too many parts" before write, + /// It's only allowed to throw "too many parts" before write, /// because interrupting long-running INSERT query in the middle is not convenient for users. - storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context); + storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, true); } template diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index 8d9e2e14129..868590efa25 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -123,6 +123,7 @@ private: bool quorum_parallel = false; const bool deduplicate = true; bool last_block_is_duplicate = false; + UInt64 num_blocks_processed = 0; using Logger = Poco::Logger; Poco::Logger * log;