Implement "max_partitions_per_insert_block" setting #4700

This commit is contained in:
Alexey Milovidov 2019-03-29 12:33:39 +03:00
parent f95dd23110
commit 8cb1619607
9 changed files with 26 additions and 16 deletions

View File

@ -308,6 +308,8 @@ struct Settings
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
\
M(SettingBool, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

View File

@ -16,7 +16,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.data.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{
Stopwatch watch;

View File

@ -13,14 +13,15 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_)
: storage(storage_) {}
MergeTreeBlockOutputStream(StorageMergeTree & storage_, size_t max_parts_per_block)
: storage(storage_), max_parts_per_block(max_parts_per_block) {}
Block getHeader() const override;
void write(const Block & block) override;
private:
StorageMergeTree & storage;
size_t max_parts_per_block;
};
}

View File

@ -4,6 +4,7 @@
#include <Common/HashTable/HashMap.h>
#include <Interpreters/AggregationCommon.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -30,7 +31,8 @@ namespace
void buildScatterSelector(
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector)
IColumn::Selector & selector,
size_t max_parts)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
@ -47,6 +49,9 @@ void buildScatterSelector(
if (inserted)
{
if (max_parts && partitions_count >= max_parts)
throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).");
partition_num_to_first_row.push_back(i);
it->getSecond() = partitions_count;
@ -67,7 +72,7 @@ void buildScatterSelector(
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -92,7 +97,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector);
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);

View File

@ -41,7 +41,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
BlocksWithPartition splitBlockIntoParts(const Block & block);
BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.

View File

@ -33,8 +33,8 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), deduplicate(deduplicate_),
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block), deduplicate(deduplicate_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
@ -122,7 +122,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
checkQuorumPrecondition(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{

View File

@ -22,8 +22,9 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_,
bool deduplicate_);
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block,
bool deduplicate_);
Block getHeader() const override;
void writePrefix() override;
@ -56,6 +57,7 @@ private:
StorageReplicatedMergeTree & storage;
size_t quorum;
size_t quorum_timeout_ms;
size_t max_parts_per_block;
bool deduplicate = true;
bool last_block_is_duplicate = false;

View File

@ -126,9 +126,9 @@ BlockInputStreams StorageMergeTree::read(
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & /*context*/)
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
}
void StorageMergeTree::checkTableCanBeDropped() const

View File

@ -2959,7 +2959,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool deduplicate = data.settings.replicated_deduplication_window != 0 && settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate);
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate, settings.max_partitions_per_insert_block);
}
@ -3577,7 +3577,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
}
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false, 0); /// TODO Allow to use quorum here.
for (auto & part : loaded_parts)
{
String old_name = part->name;