Refactor inserts and its tails

This commit is contained in:
Ivan Lezhankin 2021-02-15 21:57:35 +03:00
parent 0854dccfde
commit f2b1708164
7 changed files with 24 additions and 91 deletions

View File

@ -152,7 +152,7 @@ public:
class ThreadFromGlobalPool
{
public:
ThreadFromGlobalPool() {}
ThreadFromGlobalPool() = default;
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)

View File

@ -1,56 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/copyData.h>
namespace DB
{
class IBlockOutputStream;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/** An empty stream of blocks.
* But at the first read attempt, copies the data from the passed `input` to the `output`.
* This is necessary to execute the query INSERT SELECT - the query copies data, but returns nothing.
* The query could be executed without wrapping it in an empty BlockInputStream,
* but the progress of query execution and the ability to cancel the query would not work.
*/
class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(std::move(input_))
, output(std::move(output_))
{
children.push_back(input);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
void readPrefix() override {}
void readSuffix() override {}
String getName() const override { return "NullAndDoCopy"; }
Block getHeader() const override { return {}; }
Block getTotals() override { return {}; }
Block getExtremes() override { return {}; }
protected:
Block readImpl() override
{
/// We do not use cancel flag here.
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
copyData(*input, *output);
return Block();
}
private:
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
};
}

View File

@ -6,7 +6,6 @@
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
@ -403,14 +402,16 @@ BlockIO InterpreterInsertQuery::execute()
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (query.data && !query.has_tail) /// can execute without additional data
else if (query.data)
{
// res.out = std::move(out_streams.at(0));
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out_streams.at(0));
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, query.tail, query_sample_block, context, nullptr);
res.out = std::move(out_streams.at(0));
}
else
res.out = std::move(out_streams.at(0));
{
assert(false);
__builtin_unreachable();
}
res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))

View File

@ -338,7 +338,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr)
{
const auto current_time = std::chrono::system_clock::now();
@ -421,7 +420,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (insert_query && insert_query->data)
{
query_end = insert_query->data;
insert_query->has_tail = has_query_tail;
insert_query->tail = istr;
}
else
{
@ -896,13 +895,11 @@ BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data)
QueryProcessingStage::Enum stage)
{
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr);
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, nullptr);
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
@ -922,10 +919,9 @@ BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
{
BlockIO res = executeQuery(query, context, internal, stage, may_have_embedded_data);
BlockIO res = executeQuery(query, context, internal, stage);
if (!allow_processors && res.pipeline.initialized())
res.in = res.getInputStream();
@ -945,22 +941,16 @@ void executeQuery(
const char * begin;
const char * end;
/// If 'istr' is empty now, fetch next data into buffer.
if (!istr.hasPendingData())
istr.next();
istr.nextIfAtEnd();
size_t max_query_size = context.getSettingsRef().max_query_size;
bool may_have_tail;
if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size))
{
/// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace.
begin = istr.position();
end = istr.buffer().end();
istr.position() += end - begin;
/// Actually we don't know will query has additional data or not.
/// But we can't check istr.eof(), because begin and end pointers will become invalid
may_have_tail = true;
}
else
{
@ -973,24 +963,23 @@ void executeQuery(
begin = parse_buf.data();
end = begin + parse_buf.size();
/// Can check stream for eof, because we have copied data
may_have_tail = !istr.eof();
}
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, &istr);
auto & pipeline = streams.pipeline;
assert(streams.in || pipeline.initialized());
try
{
if (streams.out)
{
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
copyData(in, *streams.out);
copyData(*streams.in, *streams.out);
}
else if (streams.in)
else if (!pipeline.initialized())
{
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
@ -1028,7 +1017,7 @@ void executeQuery(
copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); });
}
else if (pipeline.initialized())
else
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());

View File

@ -41,8 +41,7 @@ BlockIO executeQuery(
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false /// If insert query may have embedded data
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete /// To which stage the query must be executed.
);
/// Old interface with allow_processors flag. For compatibility.
@ -51,7 +50,6 @@ BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors /// If can use processors pipeline
);

View File

@ -6,6 +6,7 @@
namespace DB
{
class ReadBuffer;
/** INSERT query
*/
@ -24,8 +25,8 @@ public:
const char * data = nullptr;
const char * end = nullptr;
/// Query has additional data, which will be sent later
bool has_tail = false;
/// Query may have additional data if buffer is not nullptr
ReadBuffer * tail;
/// Try to find table function input() in SELECT part
void tryFindInputFunction(ASTPtr & input_function) const;

View File

@ -775,7 +775,7 @@ namespace
query_end = insert_query->data;
}
String query(begin, query_end);
io = ::DB::executeQuery(query, *query_context, false, QueryProcessingStage::Complete, true, true);
io = ::DB::executeQuery(query, *query_context, false, QueryProcessingStage::Complete, true);
}
void Call::processInput()