Squashing small blocks on INSERT (when passing blocks via TCP interface or when doing INSERT SELECT) [#METR-21877].

This commit is contained in:
Alexey Milovidov 2016-07-02 00:02:13 +03:00
parent eba73d8d87
commit 1a420a14e9
6 changed files with 150 additions and 2 deletions

View File

@ -266,6 +266,7 @@ add_library (dbms
include/DB/DataStreams/CSVRowOutputStream.h
include/DB/DataStreams/CSVRowInputStream.h
include/DB/DataStreams/verbosePrintString.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
include/DB/DataTypes/DataTypeSet.h
@ -744,6 +745,7 @@ add_library (dbms
src/DataStreams/RemoteBlockInputStream.cpp
src/DataStreams/BlockIO.cpp
src/DataStreams/verbosePrintString.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataTypes/DataTypeString.cpp
src/DataTypes/DataTypeFixedString.cpp

View File

@ -0,0 +1,51 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Merging consequtive blocks of stream to specified minimum size.
*
* (But if one of input blocks has already at least specified size,
* then don't merge it with neighbours, even if neighbours are small.)
*
* Used to prepare blocks to adequate size for INSERT queries,
* because such storages as Memory, StripeLog, Log, TinyLog...
* store or compress data in blocks exactly as passed to it,
* and blocks of small size are not efficient.
*
* Order of data is kept.
*/
class SquashingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "Squashing"; }
String getID() const override
{
std::stringstream res;
res << "Squashing(" << children.at(0)->getID() << ")";
return res.str();
}
protected:
Block readImpl() override;
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
bool all_read = false;
void append(Block && block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
}

View File

@ -39,6 +39,10 @@ struct Settings
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \
/** Максимальный размер блока для вставки, если мы управляем формированием блоков для вставки. */ \
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE) \
/** Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough. */ \
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE) \
/** Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough. */ \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256)) \
/** Максимальное количество потоков выполнения запроса. По-умолчанию - определять автоматически. */ \
M(SettingMaxThreads, max_threads, 0) \
/** Максимальный размер буфера для чтения из файловой системы. */ \

View File

@ -0,0 +1,80 @@
#include <DB/DataStreams/SquashingBlockInputStream.h>
namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
: min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes)
{
children.emplace_back(src);
}
Block SquashingBlockInputStream::readImpl()
{
if (all_read)
return {};
while (Block block = children[0]->read())
{
/// Just read block is alredy enough.
if (isEnoughSize(block.rowsInFirstColumn(), block.bytes()))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
return block;
/// Return accumulated data (may be it has small size) and place new block to accumulated data.
accumulated_block.swap(block);
return block;
}
/// Accumulated block is already enough.
if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
/// Return accumulated data and place new block to accumulated data.
accumulated_block.swap(block);
return block;
}
append(std::move(block));
if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
Block res;
res.swap(accumulated_block);
return res;
}
}
all_read = true;
return accumulated_block;
}
void SquashingBlockInputStream::append(Block && block)
{
if (!accumulated_block)
{
accumulated_block = std::move(block);
return;
}
size_t columns = block.columns();
size_t rows = block.rowsInFirstColumn();
for (size_t i = 0; i < columns; ++i)
accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom(
*block.unsafeGetByPosition(i).column, 0, rows);
}
bool SquashingBlockInputStream::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -5,6 +5,7 @@
#include <DB/DataStreams/AddingDefaultBlockOutputStream.h>
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/DataStreams/SquashingBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h>
@ -98,7 +99,12 @@ BlockIO InterpreterInsertQuery::execute()
else
{
InterpreterSelectQuery interpreter_select{query.select, context};
BlockInputStreamPtr in{interpreter_select.execute().in};
BlockInputStreamPtr in = interpreter_select.execute().in;
in = std::make_shared<SquashingBlockInputStream>(in,
context.getSettingsRef().min_insert_block_size_rows,
context.getSettingsRef().min_insert_block_size_bytes);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in, out);
res.in_sample = interpreter_select.getSampleBlock();
}

View File

@ -18,6 +18,7 @@
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/SquashingBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Quota.h>
@ -616,6 +617,10 @@ void TCPHandler::initBlockInput()
state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,
client_revision);
state.block_in = std::make_shared<SquashingBlockInputStream>(state.block_in,
query_context.getSettingsRef().min_insert_block_size_rows,
query_context.getSettingsRef().min_insert_block_size_bytes);
}
}
@ -626,7 +631,7 @@ void TCPHandler::initBlockOutput()
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, query_context.getSettings().network_compression_method);
*out, query_context.getSettingsRef().network_compression_method);
else
state.maybe_compressed_out = out;