parallel INSERT in INSERT SELECT query

This commit is contained in:
Vxider 2019-12-12 18:49:15 +08:00
parent f914e01d88
commit 2a94832ef6
8 changed files with 169 additions and 46 deletions

View File

@ -52,6 +52,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "The maximum block size for insertion, if we control the creation of blocks for insertion.", 0) \
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.", 0) \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
M(SettingMaxThreads, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. By default, it is determined automatically.", 0) \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \

View File

@ -12,5 +12,6 @@ class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockOutputStreams = std::vector<BlockOutputStreamPtr>;
}

View File

@ -21,9 +21,19 @@ class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(input_), output(output_)
{
children.push_back(input_);
input_streams.push_back(input_);
output_streams.push_back(output_);
for (auto & input_stream : input_streams)
children.push_back(input_stream);
}
NullAndDoCopyBlockInputStream(const BlockInputStreams & input_, BlockOutputStreams & output_)
: input_streams(input_), output_streams(output_)
{
for (auto & input_stream : input_)
children.push_back(input_stream);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
@ -39,13 +49,16 @@ public:
protected:
Block readImpl() override
{
copyData(*input, *output);
if (input_streams.size() == 1 && output_streams.size() == 1)
copyData(*input_streams.at(0), *output_streams.at(0));
else
copyData(input_streams, output_streams);
return Block();
}
private:
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
BlockInputStreams input_streams;
BlockOutputStreams output_streams;
};
}

View File

@ -1,6 +1,8 @@
#include <thread>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace DB
@ -51,6 +53,73 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
inline void doNothing(const Block &) {}
void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos)
{
if (froms.size() == tos.size())
{
std::vector<std::thread> threads(froms.size());
for (size_t i = 0; i < froms.size(); i++)
{
threads[i] = std::thread(
[&](BlockInputStreamPtr from, BlockOutputStreamPtr to) {
from->readPrefix();
to->writePrefix();
while (Block block = from->read())
{
to->write(block);
}
from->readSuffix();
to->writeSuffix();
}, froms.at(i), tos.at(i));
}
for (auto & thread : threads)
thread.join();
}
else
{
ConcurrentBoundedQueue<Block> queue(froms.size());
std::thread from_threads([&]() {
std::vector<std::thread> _from_threads;
for (auto & _from : froms)
{
_from_threads.emplace_back([&](BlockInputStreamPtr from) {
from->readPrefix();
while (Block block = from->read())
{
queue.push(block);
}
from->readSuffix();
}, _from);
}
for (auto & thread : _from_threads)
thread.join();
for (size_t i = 0; i < tos.size(); i++)
queue.push({});
});
std::vector<std::thread> _to_threads;
for (auto & _to : tos)
{
_to_threads.emplace_back([&](BlockOutputStreamPtr to) {
to->writePrefix();
Block block;
while (true)
{
queue.pop(block);
if (!block)
break;
to->write(block);
}
to->writeSuffix();
}, _to);
}
from_threads.join();
for (auto & thread : _to_threads)
thread.join();
}
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
@ -61,6 +130,10 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(BlockInputStreams & froms, BlockOutputStreams & tos)
{
copyDataImpl(froms, tos);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{

View File

@ -16,6 +16,8 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(BlockInputStreams & froms, BlockOutputStreams & tos);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

View File

@ -103,56 +103,84 @@ BlockIO InterpreterInsertQuery::execute()
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, context);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, table, context, query_ptr, no_destination);
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
{
out = std::make_shared<SquashingBlockOutputStream>(
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
auto query_sample_block = getSampleBlock(query, table);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
BlockIO res;
/// What type of query: INSERT or INSERT SELECT?
BlockInputStreams in_streams;
size_t out_streams_size = 1;
if (query.select)
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
/// BlockIO may hold StoragePtrs to temporary tables
res = interpreter_select.execute();
res.out = nullptr;
if (table->supportsParallelInsert())
{
in_streams = interpreter_select.executeWithMultipleStreams(res.pipeline);
const Settings & settings = context.getSettingsRef();
out_streams_size = std::min(size_t(settings.max_insert_threads), in_streams.size());
}
else
in_streams.emplace_back(interpreter_select.execute().in);
}
BlockOutputStreams out_streams;
auto query_sample_block = getSampleBlock(query, table);
while (out_streams_size-- > 0)
{
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, context);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, table, context, query_ptr, no_destination);
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
{
out = std::make_shared<SquashingBlockOutputStream>(
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
out_streams.emplace_back(std::move(out));
}
/// What type of query: INSERT or INSERT SELECT?
if (query.select)
{
for (auto & in_stream : in_streams)
{
in_stream = std::make_shared<ConvertingBlockInputStream>(
context, in_stream, out_streams.at(0)->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
}
Block in_header = in_streams.at(0)->getHeader();
if(in_streams.size() > 1)
{
for (size_t i = 1; i < in_streams.size(); ++i)
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, "INSERT SELECT");
}
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
if (!allow_materialized)
{
Block in_header = res.in->getHeader();
for (const auto & column : table->getColumns())
if (column.default_desc.kind == ColumnDefaultKind::Materialized && in_header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
@ -160,12 +188,12 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.data && !query.has_tail) /// can execute without additional data
{
res.out = std::move(out_streams.at(0));
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
}
else
res.out = std::move(out);
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table);
return res;

View File

@ -104,6 +104,9 @@ public:
/// Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas.
virtual bool supportsReplication() const { return false; }
/// Returns true if the storage supports parallel insert.
virtual bool supportsParallelInsert() const { return false; }
/// Returns true if the storage supports deduplication of inserted data blocks.
virtual bool supportsDeduplication() const { return false; }

View File

@ -35,6 +35,8 @@ public:
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }
Pipes readWithProcessors(