diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 8e0694a8170..4e104898bee 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -266,6 +266,7 @@ add_library (dbms include/DB/DataStreams/CSVRowOutputStream.h include/DB/DataStreams/CSVRowInputStream.h include/DB/DataStreams/verbosePrintString.h + include/DB/DataStreams/SquashingBlockInputStream.h include/DB/DataTypes/IDataType.h include/DB/DataTypes/IDataTypeDummy.h include/DB/DataTypes/DataTypeSet.h @@ -744,6 +745,7 @@ add_library (dbms src/DataStreams/RemoteBlockInputStream.cpp src/DataStreams/BlockIO.cpp src/DataStreams/verbosePrintString.cpp + src/DataStreams/SquashingBlockInputStream.cpp src/DataTypes/DataTypeString.cpp src/DataTypes/DataTypeFixedString.cpp diff --git a/dbms/include/DB/DataStreams/SquashingBlockInputStream.h b/dbms/include/DB/DataStreams/SquashingBlockInputStream.h new file mode 100644 index 00000000000..deeff4a2aa8 --- /dev/null +++ b/dbms/include/DB/DataStreams/SquashingBlockInputStream.h @@ -0,0 +1,51 @@ +#pragma once + +#include + + +namespace DB +{ + +/** Merging consequtive blocks of stream to specified minimum size. + * + * (But if one of input blocks has already at least specified size, + * then don't merge it with neighbours, even if neighbours are small.) + * + * Used to prepare blocks to adequate size for INSERT queries, + * because such storages as Memory, StripeLog, Log, TinyLog... + * store or compress data in blocks exactly as passed to it, + * and blocks of small size are not efficient. + * + * Order of data is kept. + */ +class SquashingBlockInputStream : public IProfilingBlockInputStream +{ +public: + /// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored. + SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes); + + String getName() const override { return "Squashing"; } + + String getID() const override + { + std::stringstream res; + res << "Squashing(" << children.at(0)->getID() << ")"; + return res.str(); + } + +protected: + Block readImpl() override; + +private: + size_t min_block_size_rows; + size_t min_block_size_bytes; + + Block accumulated_block; + bool all_read = false; + + void append(Block && block); + + bool isEnoughSize(size_t rows, size_t bytes) const; +}; + +} diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index eb36d2e7255..178991802f5 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -39,6 +39,10 @@ struct Settings M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \ /** Максимальный размер блока для вставки, если мы управляем формированием блоков для вставки. */ \ M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE) \ + /** Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough. */ \ + M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE) \ + /** Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough. */ \ + M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256)) \ /** Максимальное количество потоков выполнения запроса. По-умолчанию - определять автоматически. */ \ M(SettingMaxThreads, max_threads, 0) \ /** Максимальный размер буфера для чтения из файловой системы. */ \ diff --git a/dbms/src/DataStreams/SquashingBlockInputStream.cpp b/dbms/src/DataStreams/SquashingBlockInputStream.cpp new file mode 100644 index 00000000000..b8405be9884 --- /dev/null +++ b/dbms/src/DataStreams/SquashingBlockInputStream.cpp @@ -0,0 +1,80 @@ +#include + + +namespace DB +{ + +SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes) + : min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes) +{ + children.emplace_back(src); +} + + +Block SquashingBlockInputStream::readImpl() +{ + if (all_read) + return {}; + + while (Block block = children[0]->read()) + { + /// Just read block is alredy enough. + if (isEnoughSize(block.rowsInFirstColumn(), block.bytes())) + { + /// If no accumulated data, return just read block. + if (!accumulated_block) + return block; + + /// Return accumulated data (may be it has small size) and place new block to accumulated data. + accumulated_block.swap(block); + return block; + } + + /// Accumulated block is already enough. + if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) + { + /// Return accumulated data and place new block to accumulated data. + accumulated_block.swap(block); + return block; + } + + append(std::move(block)); + + if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) + { + Block res; + res.swap(accumulated_block); + return res; + } + } + + all_read = true; + return accumulated_block; +} + + +void SquashingBlockInputStream::append(Block && block) +{ + if (!accumulated_block) + { + accumulated_block = std::move(block); + return; + } + + size_t columns = block.columns(); + size_t rows = block.rowsInFirstColumn(); + + for (size_t i = 0; i < columns; ++i) + accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom( + *block.unsafeGetByPosition(i).column, 0, rows); +} + + +bool SquashingBlockInputStream::isEnoughSize(size_t rows, size_t bytes) const +{ + return (!min_block_size_rows && !min_block_size_bytes) + || (min_block_size_rows && rows >= min_block_size_rows) + || (min_block_size_bytes && bytes >= min_block_size_bytes); +} + +} diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index e1d2e5417b2..896fda1f45e 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -98,7 +99,12 @@ BlockIO InterpreterInsertQuery::execute() else { InterpreterSelectQuery interpreter_select{query.select, context}; - BlockInputStreamPtr in{interpreter_select.execute().in}; + BlockInputStreamPtr in = interpreter_select.execute().in; + + in = std::make_shared(in, + context.getSettingsRef().min_insert_block_size_rows, + context.getSettingsRef().min_insert_block_size_bytes); + res.in = std::make_shared(in, out); res.in_sample = interpreter_select.getSampleBlock(); } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 222341a2c04..f2c0843fd96 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -616,6 +617,10 @@ void TCPHandler::initBlockInput() state.block_in = std::make_shared( *state.maybe_compressed_in, client_revision); + + state.block_in = std::make_shared(state.block_in, + query_context.getSettingsRef().min_insert_block_size_rows, + query_context.getSettingsRef().min_insert_block_size_bytes); } } @@ -626,7 +631,7 @@ void TCPHandler::initBlockOutput() { if (state.compression == Protocol::Compression::Enable) state.maybe_compressed_out = std::make_shared( - *out, query_context.getSettings().network_compression_method); + *out, query_context.getSettingsRef().network_compression_method); else state.maybe_compressed_out = out;