From 8cb161960794f60e648dffaaaafd79a20f5fc5ed Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 29 Mar 2019 12:33:39 +0300 Subject: [PATCH] Implement "max_partitions_per_insert_block" setting #4700 --- dbms/src/Core/Settings.h | 2 ++ .../Storages/MergeTree/MergeTreeBlockOutputStream.cpp | 2 +- .../Storages/MergeTree/MergeTreeBlockOutputStream.h | 5 +++-- dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp | 11 ++++++++--- dbms/src/Storages/MergeTree/MergeTreeDataWriter.h | 2 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 6 +++--- .../MergeTree/ReplicatedMergeTreeBlockOutputStream.h | 6 ++++-- dbms/src/Storages/StorageMergeTree.cpp | 4 ++-- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 4 ++-- 9 files changed, 26 insertions(+), 16 deletions(-) diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index de7858d227f..f677b8a7079 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -308,6 +308,8 @@ struct Settings M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \ \ M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \ + \ + M(SettingBool, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \ #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \ TYPE NAME {DEFAULT}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index b3c2336bb2f..a1aa53a529b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -16,7 +16,7 @@ void MergeTreeBlockOutputStream::write(const Block & block) { storage.data.delayInsertOrThrowIfNeeded(); - auto part_blocks = storage.writer.splitBlockIntoParts(block); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); for (auto & current_block : part_blocks) { Stopwatch watch; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 64243b6e7bf..ace24b474f1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -13,14 +13,15 @@ class StorageMergeTree; class MergeTreeBlockOutputStream : public IBlockOutputStream { public: - MergeTreeBlockOutputStream(StorageMergeTree & storage_) - : storage(storage_) {} + MergeTreeBlockOutputStream(StorageMergeTree & storage_, size_t max_parts_per_block) + : storage(storage_), max_parts_per_block(max_parts_per_block) {} Block getHeader() const override; void write(const Block & block) override; private: StorageMergeTree & storage; + size_t max_parts_per_block; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index bd293c224a0..281e25a69f0 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -30,7 +31,8 @@ namespace void buildScatterSelector( const ColumnRawPtrs & columns, PODArray & partition_num_to_first_row, - IColumn::Selector & selector) + IColumn::Selector & selector, + size_t max_parts) { /// Use generic hashed variant since partitioning is unlikely to be a bottleneck. using Data = HashMap; @@ -47,6 +49,9 @@ void buildScatterSelector( if (inserted) { + if (max_parts && partitions_count >= max_parts) + throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc)."); + partition_num_to_first_row.push_back(i); it->getSecond() = partitions_count; @@ -67,7 +72,7 @@ void buildScatterSelector( } -BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block) +BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts) { BlocksWithPartition result; if (!block || !block.rows()) @@ -92,7 +97,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block PODArray partition_num_to_first_row; IColumn::Selector selector; - buildScatterSelector(partition_columns, partition_num_to_first_row, selector); + buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts); size_t partitions_count = partition_num_to_first_row.size(); result.reserve(partitions_count); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.h b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.h index 95371021939..c2878145a50 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -41,7 +41,7 @@ public: * (split rows by partition) * Works deterministically: if same block was passed, function will return same result in same order. */ - BlocksWithPartition splitBlockIntoParts(const Block & block); + BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts); /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 518731051a0..83194a9eaa1 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -33,8 +33,8 @@ namespace ErrorCodes ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( - StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_) - : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), deduplicate(deduplicate_), + StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, bool deduplicate_) + : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block), deduplicate(deduplicate_), log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) { /// The quorum value `1` has the same meaning as if it is disabled. @@ -122,7 +122,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (quorum) checkQuorumPrecondition(zookeeper); - auto part_blocks = storage.writer.splitBlockIntoParts(block); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); for (auto & current_block : part_blocks) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index f6ad819c4fb..e20a36a9440 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -22,8 +22,9 @@ class StorageReplicatedMergeTree; class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream { public: - ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, - bool deduplicate_); + ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, + size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, + bool deduplicate_); Block getHeader() const override; void writePrefix() override; @@ -56,6 +57,7 @@ private: StorageReplicatedMergeTree & storage; size_t quorum; size_t quorum_timeout_ms; + size_t max_parts_per_block; bool deduplicate = true; bool last_block_is_duplicate = false; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index fd356e75e8f..6e0142d3b57 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -126,9 +126,9 @@ BlockInputStreams StorageMergeTree::read( return reader.read(column_names, query_info, context, max_block_size, num_streams); } -BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & /*context*/) +BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context) { - return std::make_shared(*this); + return std::make_shared(*this, context.getSettingsRef().max_partitions_per_insert_block); } void StorageMergeTree::checkTableCanBeDropped() const diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index fd1859ff6e4..4067ceb40d3 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2959,7 +2959,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, bool deduplicate = data.settings.replicated_deduplication_window != 0 && settings.insert_deduplicate; return std::make_shared(*this, - settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate); + settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate, settings.max_partitions_per_insert_block); } @@ -3577,7 +3577,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part)); } - ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here. + ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false, 0); /// TODO Allow to use quorum here. for (auto & part : loaded_parts) { String old_name = part->name;