Merge pull request #4845 from yandex/max_partitions_per_insert_block

Implement "max_partitions_per_insert_block" setting
This commit is contained in:
alexey-milovidov 2019-03-29 16:20:46 +03:00 committed by GitHub
commit d10b256a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 44 additions and 17 deletions

View File

@ -308,6 +308,8 @@ struct Settings
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
\
M(SettingBool, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

View File

@ -16,7 +16,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.data.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{
Stopwatch watch;

View File

@ -13,14 +13,15 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_)
: storage(storage_) {}
MergeTreeBlockOutputStream(StorageMergeTree & storage_, size_t max_parts_per_block)
: storage(storage_), max_parts_per_block(max_parts_per_block) {}
Block getHeader() const override;
void write(const Block & block) override;
private:
StorageMergeTree & storage;
size_t max_parts_per_block;
};
}

View File

@ -1,9 +1,10 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Interpreters/AggregationCommon.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -22,6 +23,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS;
}
namespace
@ -30,7 +32,8 @@ namespace
void buildScatterSelector(
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector)
IColumn::Selector & selector,
size_t max_parts)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
@ -47,6 +50,9 @@ void buildScatterSelector(
if (inserted)
{
if (max_parts && partitions_count >= max_parts)
throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).", ErrorCodes::TOO_MANY_PARTS);
partition_num_to_first_row.push_back(i);
it->getSecond() = partitions_count;
@ -67,7 +73,7 @@ void buildScatterSelector(
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -92,7 +98,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector);
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);

View File

@ -41,7 +41,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
BlocksWithPartition splitBlockIntoParts(const Block & block);
BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.

View File

@ -33,8 +33,8 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), deduplicate(deduplicate_),
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block), deduplicate(deduplicate_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
@ -122,7 +122,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
checkQuorumPrecondition(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{

View File

@ -22,8 +22,9 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_,
bool deduplicate_);
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block,
bool deduplicate_);
Block getHeader() const override;
void writePrefix() override;
@ -56,6 +57,7 @@ private:
StorageReplicatedMergeTree & storage;
size_t quorum;
size_t quorum_timeout_ms;
size_t max_parts_per_block;
bool deduplicate = true;
bool last_block_is_duplicate = false;

View File

@ -126,9 +126,9 @@ BlockInputStreams StorageMergeTree::read(
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & /*context*/)
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
}
void StorageMergeTree::checkTableCanBeDropped() const

View File

@ -2959,7 +2959,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool deduplicate = data.settings.replicated_deduplication_window != 0 && settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate);
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), settings.max_partitions_per_insert_block, deduplicate);
}
@ -3577,7 +3577,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
}
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
for (auto & part : loaded_parts)
{
String old_name = part->name;

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.partitions;
CREATE TABLE test.partitions (x UInt64) ENGINE = MergeTree ORDER BY x PARTITION BY x;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 100;
SELECT count() FROM system.parts WHERE database = 'test' AND table = 'partitions';
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 100;
SELECT count() FROM system.parts WHERE database = 'test' AND table = 'partitions';
SET max_partitions_per_insert_block = 1;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 1;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 2; -- { serverError 252 }
DROP TABLE test.partitions;