Small refactoring.

This commit is contained in:
Nikolai Kochetov 2021-09-15 22:35:48 +03:00
parent b997214620
commit e616732743
58 changed files with 897 additions and 624 deletions

View File

@ -2034,8 +2034,7 @@ private:
});
}
QueryPipelineBuilder pipeline;
pipeline.init(std::move(pipe));
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
Block block;

View File

@ -1162,8 +1162,7 @@ try
Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size));
QueryPipelineBuilder pipeline;
pipeline.init(std::move(pipe));
QueryPipeline pipeline(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
Block block;
@ -1200,8 +1199,7 @@ try
});
}
QueryPipelineBuilder pipeline;
pipeline.init(std::move(pipe));
QueryPipeline pipeline(std::move(pipe));
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);

View File

@ -14,6 +14,7 @@
#include <Interpreters/InterpreterCreateUserQuery.h>
#include <Interpreters/InterpreterShowGrantsQuery.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>

View File

@ -10,22 +10,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BlockInputStreamPtr BlockIO::getInputStream()
{
if (!out.empty())
throw Exception("Cannot get input stream from BlockIO because output stream is not empty",
ErrorCodes::LOGICAL_ERROR);
if (in)
return in;
if (pipeline.initialized())
return std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
throw Exception("Cannot get input stream from BlockIO because query pipeline was not initialized",
ErrorCodes::LOGICAL_ERROR);
}
void BlockIO::reset()
{
/** process_list_entry should be destroyed after in, after out and after pipeline,
@ -38,8 +22,6 @@ void BlockIO::reset()
*/
/// TODO simplify it all
out.reset();
in.reset();
if (process_list_entry)
process_list_entry->get().releaseQueryStreams();
pipeline.reset();
@ -57,8 +39,6 @@ BlockIO & BlockIO::operator= (BlockIO && rhs)
reset();
process_list_entry = std::move(rhs.process_list_entry);
in = std::move(rhs.in);
out = std::move(rhs.out);
pipeline = std::move(rhs.pipeline);
finish_callback = std::move(rhs.finish_callback);

View File

@ -1,11 +1,7 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <functional>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Chain.h>
#include <Processors/QueryPipeline.h>
namespace DB
@ -26,14 +22,11 @@ struct BlockIO
std::shared_ptr<ProcessListEntry> process_list_entry;
Chain out;
BlockInputStreamPtr in;
QueryPipelineBuilder pipeline;
QueryPipeline pipeline;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, QueryPipelineBuilder *)> finish_callback;
std::function<void()> exception_callback;
std::function<void(QueryPipeline &)> finish_callback;
std::function<void()> exception_callback;
/// When it is true, don't bother sending any non-empty blocks to the out stream
bool null_format = false;
@ -43,11 +36,7 @@ struct BlockIO
{
if (finish_callback)
{
QueryPipelineBuilder * pipeline_ptr = nullptr;
if (pipeline.initialized())
pipeline_ptr = &pipeline;
finish_callback(in.get(), pipeline_ptr);
finish_callback(pipeline);
}
}
@ -57,9 +46,6 @@ struct BlockIO
exception_callback();
}
/// Returns in or converts pipeline to stream. Throws if out is not empty.
BlockInputStreamPtr getInputStream();
private:
void reset();
};

View File

@ -387,30 +387,22 @@ Chain buildPushingToViewsDrain(
query = dependent_metadata_snapshot->getSelectQuery().inner_query;
target_name = inner_table_id.getFullTableName();
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = inner_table_id;
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze())
.getSampleBlock();
/// Insert only columns returned by select.
auto list = std::make_shared<ASTExpressionList>();
Names insert_columns;
const auto & inner_table_columns = inner_metadata_snapshot->getColumns();
for (const auto & column : header)
{
/// But skip columns which storage doesn't have.
if (inner_table_columns.hasPhysical(column.name))
list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
insert_columns.emplace_back(column.name);
}
insert->columns = std::move(list);
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, view_runtime_data);
BlockIO io = interpreter.execute();
io.out.attachResources(QueryPipelineBuilder::getPipe(std::move(io.pipeline)).detachResources());
out = std::move(io.out);
InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, view_runtime_data);
}
else if (auto * live_view = dynamic_cast<StorageLiveView *>(dependent_table.get()))
{
@ -547,34 +539,35 @@ static void process(Block & block, ViewRuntimeData & view, const StorageID & sou
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
auto io = select.execute();
io.pipeline.resize(1);
auto pipeline = select.buildQueryPipeline();
pipeline.resize(1);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
io.pipeline.addTransform(std::make_shared<SquashingChunksTransform>(
io.pipeline.getHeader(),
pipeline.addTransform(std::make_shared<SquashingChunksTransform>(
pipeline.getHeader(),
context->getSettingsRef().min_insert_block_size_rows,
context->getSettingsRef().min_insert_block_size_bytes));
auto converting = ActionsDAG::makeConvertingActions(
io.pipeline.getHeader().getColumnsWithTypeAndName(),
pipeline.getHeader().getColumnsWithTypeAndName(),
view.sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
io.pipeline.addTransform(std::make_shared<ExpressionTransform>(
io.pipeline.getHeader(),
pipeline.addTransform(std::make_shared<ExpressionTransform>(
pipeline.getHeader(),
std::make_shared<ExpressionActions>(std::move(converting))));
io.pipeline.setProgressCallback([context](const Progress & progress)
pipeline.setProgressCallback([context](const Progress & progress)
{
CurrentThread::updateProgressIn(progress);
if (auto callback = context->getProgressCallback())
callback(progress);
});
PullingPipelineExecutor executor(io.pipeline);
auto query_pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
PullingPipelineExecutor executor(query_pipeline);
if (!executor.pull(block))
{
block.clear();

View File

@ -1,5 +1,8 @@
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/QueryLog.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace DB
{
@ -9,4 +12,15 @@ void IInterpreterUnionOrSelectQuery::extendQueryLogElemImpl(QueryLogElement & el
elem.query_kind = "Select";
}
QueryPipelineBuilder IInterpreterUnionOrSelectQuery::buildQueryPipeline()
{
QueryPlan query_plan;
buildQueryPlan(query_plan);
return std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context))));
}
}

View File

@ -28,6 +28,7 @@ public:
}
virtual void buildQueryPlan(QueryPlan & query_plan) = 0;
QueryPipelineBuilder buildQueryPipeline();
virtual void ignoreWithTotals() = 0;

View File

@ -121,7 +121,7 @@ BlockIO InterpreterAlterQuery::execute()
table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, getContext()->getSettingsRef());
auto partition_commands_pipe = table->alterPartition(metadata_snapshot, partition_commands, getContext());
if (!partition_commands_pipe.empty())
res.pipeline.init(std::move(partition_commands_pipe));
res.pipeline = QueryPipeline(std::move(partition_commands_pipe));
}
if (!live_view_commands.empty())

View File

@ -3,7 +3,7 @@
#include <Access/AccessFlags.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTCheckQuery.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnsNumber.h>
@ -72,7 +72,7 @@ BlockIO InterpreterCheckQuery::execute()
}
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(block);
res.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(std::move(block)));
return res;
}

View File

@ -1,5 +1,5 @@
#include <Storages/IStorage.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/queryToString.h>
@ -60,7 +60,7 @@ Block InterpreterDescribeQuery::getSampleBlock()
}
BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
QueryPipeline InterpreterDescribeQuery::executeImpl()
{
ColumnsDescription columns;
@ -119,7 +119,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
res_columns[6]->insertDefault();
}
return std::make_shared<OneBlockInputStream>(sample_block.cloneWithColumns(std::move(res_columns)));
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(sample_block.cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -21,7 +21,7 @@ public:
private:
ASTPtr query_ptr;
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
};

View File

@ -1,6 +1,6 @@
#include <Storages/IStorage.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
@ -21,7 +21,7 @@ namespace ErrorCodes
BlockIO InterpreterExistsQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
@ -35,7 +35,7 @@ Block InterpreterExistsQuery::getSampleBlock()
}
BlockInputStreamPtr InterpreterExistsQuery::executeImpl()
QueryPipeline InterpreterExistsQuery::executeImpl()
{
ASTQueryWithTableAndOutput * exists_query;
bool result = false;
@ -76,10 +76,10 @@ BlockInputStreamPtr InterpreterExistsQuery::executeImpl()
result = DatabaseCatalog::instance().isDictionaryExist({database, exists_query->table});
}
return std::make_shared<OneBlockInputStream>(Block{{
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(Block{{
ColumnUInt8::create(1, result),
std::make_shared<DataTypeUInt8>(),
"result" }});
"result" }}));
}
}

View File

@ -21,7 +21,7 @@ public:
private:
ASTPtr query_ptr;
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
};

View File

@ -1,7 +1,7 @@
#include <Interpreters/InterpreterExplainQuery.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -73,7 +73,7 @@ namespace
BlockIO InterpreterExplainQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
@ -240,7 +240,7 @@ ExplainSettings<Settings> checkAndGetSettings(const ASTPtr & ast_settings)
}
BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
QueryPipeline InterpreterExplainQuery::executeImpl()
{
const auto & ast = query->as<const ASTExplainQuery &>();
@ -335,17 +335,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
{
InterpreterInsertQuery insert(ast.getExplainedQuery(), getContext());
auto io = insert.execute();
if (io.pipeline.initialized())
{
auto pipe = QueryPipelineBuilder::getPipe(std::move(io.pipeline));
const auto & processors = pipe.getProcessors();
printPipeline(processors, buf);
}
else
{
const auto & processors = io.out.getProcessors();
printPipeline(processors, buf);
}
printPipeline(io.pipeline.getProcessors(), buf);
}
else
throw Exception("Only SELECT and INSERT is supported for EXPLAIN PIPELINE query", ErrorCodes::INCORRECT_QUERY);
@ -377,7 +367,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
fillColumn(*res_columns[0], buf.str());
}
return std::make_shared<OneBlockInputStream>(sample_block.cloneWithColumns(std::move(res_columns)));
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(sample_block.cloneWithColumns(std::move(res_columns))));
}
}

View File

@ -15,12 +15,12 @@ public:
BlockIO execute() override;
static Block getSampleBlock(const ASTExplainQuery::ExplainKind kind);
static Block getSampleBlock(ASTExplainQuery::ExplainKind kind);
private:
ASTPtr query;
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
};

View File

@ -44,14 +44,12 @@ namespace ErrorCodes
}
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_,
ExceptionKeepingTransformRuntimeDataPtr runtime_data_)
const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_)
: WithContext(context_)
, query_ptr(query_ptr_)
, allow_materialized(allow_materialized_)
, no_squash(no_squash_)
, no_destination(no_destination_)
, runtime_data(std::move(runtime_data_))
{
checkStackSize();
}
@ -75,26 +73,37 @@ Block InterpreterInsertQuery::getSampleBlock(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot) const
{
Block table_sample_non_materialized = metadata_snapshot->getSampleBlockNonMaterialized();
/// If the query does not include information about columns
if (!query.columns)
{
if (no_destination)
return metadata_snapshot->getSampleBlockWithVirtuals(table->getVirtuals());
else
return table_sample_non_materialized;
return metadata_snapshot->getSampleBlockNonMaterialized();
}
Block table_sample = metadata_snapshot->getSampleBlock();
const auto columns_ast = processColumnTransformers(getContext()->getCurrentDatabase(), table, metadata_snapshot, query.columns);
/// Form the block based on the column names from the query
Block res;
Names names;
const auto columns_ast = processColumnTransformers(getContext()->getCurrentDatabase(), table, metadata_snapshot, query.columns);
for (const auto & identifier : columns_ast->children)
{
std::string current_name = identifier->getColumnName();
names.emplace_back(std::move(current_name));
}
return getSampleBlock(names, table, metadata_snapshot);
}
Block InterpreterInsertQuery::getSampleBlock(
const Names & names,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot) const
{
Block table_sample = metadata_snapshot->getSampleBlock();
Block table_sample_non_materialized = metadata_snapshot->getSampleBlockNonMaterialized();
Block res;
for (const auto & current_name : names)
{
/// The table does not have a column with that name
if (!table_sample.has(current_name))
throw Exception("No such column " + current_name + " in table " + table->getStorageID().getNameForLogs(),
@ -149,13 +158,93 @@ static bool isTrivialSelect(const ASTPtr & select)
return false;
};
Chain InterpreterInsertQuery::buildChain(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns,
ExceptionKeepingTransformRuntimeDataPtr runtime_data)
{
return buildChainImpl(table, metadata_snapshot, getSampleBlock(columns, table, metadata_snapshot), std::move(runtime_data));
}
Chain InterpreterInsertQuery::buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ExceptionKeepingTransformRuntimeDataPtr runtime_data)
{
auto context = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context->getSettingsRef();
bool null_as_default = query && query->select && context->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
{
auto sink = table->write(query_ptr, metadata_snapshot, context);
sink->setRuntimeData(runtime_data);
out.addSource(std::move(sink));
}
else
{
out = buildPushingToViewsDrain(table, metadata_snapshot, context, query_ptr, no_destination, runtime_data);
}
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out.addSource(std::make_shared<CheckConstraintsTransform>(
table->getStorageID(), out.getInputHeader(), metadata_snapshot->getConstraints(), context));
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context,
null_as_default);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(adding_missing_defaults_dag);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out.addSource(std::make_shared<ConvertingTransform>(query_sample_block, adding_missing_defaults_actions));
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(settings.insert_distributed_sync && table->isRemote()) && !no_squash && !(query && query->watch))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), runtime_data ? runtime_data->thread_status : nullptr);
counting->setProcessListElement(context->getProcessListElement());
out.addSource(std::move(counting));
return out;
}
BlockIO InterpreterInsertQuery::execute()
{
const Settings & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
BlockIO res;
QueryPipelineBuilder pipeline;
StoragePtr table = getTable(query);
if (query.partition_by && !table->supportsPartitionBy())
@ -175,7 +264,7 @@ BlockIO InterpreterInsertQuery::execute()
// Distributed INSERT SELECT
if (auto maybe_pipeline = table->distributedWrite(query, getContext()))
{
res.pipeline = std::move(*maybe_pipeline);
pipeline = std::move(*maybe_pipeline);
is_distributed_insert_select = true;
}
}
@ -184,6 +273,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
if (query.select)
{
bool is_trivial_insert_select = false;
@ -227,27 +317,27 @@ BlockIO InterpreterInsertQuery::execute()
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
pipeline = interpreter_select.buildQueryPipeline();
}
else
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{
query.select, getContext(), SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
pipeline = interpreter_select.buildQueryPipeline();
}
res.pipeline.dropTotalsAndExtremes();
pipeline.dropTotalsAndExtremes();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());
out_streams_size = std::min(size_t(settings.max_insert_threads), pipeline.getNumStreams());
res.pipeline.resize(out_streams_size);
pipeline.resize(out_streams_size);
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
if (getContext()->getSettingsRef().insert_null_as_default)
{
const auto & input_columns = res.pipeline.getHeader().getColumnsWithTypeAndName();
const auto & input_columns = pipeline.getHeader().getColumnsWithTypeAndName();
const auto & query_columns = query_sample_block.getColumnsWithTypeAndName();
const auto & output_columns = metadata_snapshot->getColumns();
@ -266,103 +356,56 @@ BlockIO InterpreterInsertQuery::execute()
else if (query.watch)
{
InterpreterWatchQuery interpreter_watch{ query.watch, getContext() };
res = interpreter_watch.execute();
pipeline = interpreter_watch.execute();
}
for (size_t i = 0; i < out_streams_size; i++)
{
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
{
auto sink = table->write(query_ptr, metadata_snapshot, getContext());
sink->setRuntimeData(runtime_data);
out.addSource(std::move(sink));
}
else
{
out = buildPushingToViewsDrain(table, metadata_snapshot, getContext(), query_ptr, no_destination, runtime_data);
}
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out.addSource(std::make_shared<CheckConstraintsTransform>(
query.table_id, out.getInputHeader(), metadata_snapshot->getConstraints(), getContext()));
bool null_as_default = query.select && getContext()->getSettingsRef().insert_null_as_default;
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
getContext(),
null_as_default);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(adding_missing_defaults_dag);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out.addSource(std::make_shared<ConvertingTransform>(query_sample_block, adding_missing_defaults_actions));
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(settings.insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), runtime_data ? runtime_data->thread_status : nullptr);
counting->setProcessListElement(getContext()->getProcessListElement());
out.addSource(std::move(counting));
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block);
out_chains.emplace_back(std::move(out));
}
}
pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
{
if (auto inner_table = mv->tryGetTargetTable())
pipeline.addStorageHolder(inner_table);
}
BlockIO res;
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (is_distributed_insert_select)
{
/// Pipeline was already built.
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
}
else if (query.select || query.watch)
{
const auto & header = out_chains.at(0).getInputHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(),
pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
res.pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<ExpressionTransform>(in_header, actions);
});
auto num_select_threads = res.pipeline.getNumThreads();
auto num_select_threads = pipeline.getNumThreads();
res.pipeline.addChains(std::move(out_chains));
pipeline.addChains(std::move(out_chains));
res.pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
{
return std::make_shared<ExceptionHandlingSink>(cur_header);
});
/// Don't use more threads for insert then for select to reduce memory consumption.
if (!settings.parallel_view_processing && res.pipeline.getNumThreads() > num_select_threads)
res.pipeline.setMaxThreads(num_select_threads);
if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads)
pipeline.setMaxThreads(num_select_threads);
if (!allow_materialized)
{
@ -370,34 +413,22 @@ BlockIO InterpreterInsertQuery::execute()
if (column.default_desc.kind == ColumnDefaultKind::Materialized && header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (query.data && !query.has_tail) /// can execute without additional data
{
auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr);
res.pipeline.init(std::move(pipe));
res.pipeline.resize(1);
res.pipeline.addChains(std::move(out_chains));
res.pipeline.setSinks([&](const Block & cur_header, Pipe::StreamType)
{
return std::make_shared<ExceptionHandlingSink>(cur_header);
});
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
}
else
{
res.out = std::move(out_chains.at(0));
res.out.setNumThreads(std::min<size_t>(res.out.getNumThreads(), settings.max_threads));
}
res.pipeline = QueryPipeline(std::move(out_chains.at(0)));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
{
if (auto inner_table = mv->tryGetTargetTable())
res.pipeline.addStorageHolder(inner_table);
if (query.data && !query.has_tail)
{
/// can execute without additional data
auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr);
res.pipeline.complete(std::move(pipe));
}
}
if (!res.out.empty())
res.out.attachResources(QueryPipelineBuilder::getPipe(std::move(res.pipeline)).detachResources());
return res;
}

View File

@ -12,6 +12,8 @@ namespace DB
struct ExceptionKeepingTransformRuntimeData;
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
class Chain;
/** Interprets the INSERT query.
*/
class InterpreterInsertQuery : public IInterpreter, WithContext
@ -22,8 +24,7 @@ public:
ContextPtr context_,
bool allow_materialized_ = false,
bool no_squash_ = false,
bool no_destination_ = false,
ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr);
bool no_destination_ = false);
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
@ -31,20 +32,33 @@ public:
* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
*/
BlockIO execute() override;
Chain buildChain();
StorageID getDatabaseTable() const;
Chain buildChain(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns,
ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr);
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
private:
StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
ASTPtr query_ptr;
const bool allow_materialized;
const bool no_squash;
const bool no_destination;
ExceptionKeepingTransformRuntimeDataPtr runtime_data;
Chain buildChainImpl(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr);
};

View File

@ -16,7 +16,9 @@
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/IStorage.h>
#include <Common/quoteString.h>
#include <thread>
@ -121,15 +123,15 @@ static QueryDescriptors extractQueriesExceptMeAndCheckAccess(const Block & proce
}
class SyncKillQueryInputStream : public IBlockInputStream
class SyncKillQuerySource : public SourceWithProgress
{
public:
SyncKillQueryInputStream(ProcessList & process_list_, QueryDescriptors && processes_to_stop_, Block && processes_block_,
SyncKillQuerySource(ProcessList & process_list_, QueryDescriptors && processes_to_stop_, Block && processes_block_,
const Block & res_sample_block_)
: process_list(process_list_),
processes_to_stop(std::move(processes_to_stop_)),
processes_block(std::move(processes_block_)),
res_sample_block(res_sample_block_)
: SourceWithProgress(res_sample_block_)
, process_list(process_list_)
, processes_to_stop(std::move(processes_to_stop_))
, processes_block(std::move(processes_block_))
{
addTotalRowsApprox(processes_to_stop.size());
}
@ -139,14 +141,12 @@ public:
return "SynchronousQueryKiller";
}
Block getHeader() const override { return res_sample_block; }
Block readImpl() override
Chunk generate() override
{
size_t num_result_queries = processes_to_stop.size();
if (num_processed_queries >= num_result_queries)
return Block();
return {};
MutableColumns columns = res_sample_block.cloneEmptyColumns();
@ -179,7 +179,8 @@ public:
/// Don't produce empty block
} while (columns.empty() || columns[0]->empty());
return res_sample_block.cloneWithColumns(std::move(columns));
size_t num_rows = columns.empty() ? columns.front()->size() : 0;
return Chunk(std::move(columns), num_rows);
}
ProcessList & process_list;
@ -221,12 +222,12 @@ BlockIO InterpreterKillQueryQuery::execute()
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
}
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(header.cloneWithColumns(std::move(res_columns))));
}
else
{
res_io.in = std::make_shared<SyncKillQueryInputStream>(
process_list, std::move(queries_to_stop), std::move(processes_block), header);
res_io.pipeline = QueryPipeline(std::make_shared<SyncKillQuerySource>(
process_list, std::move(queries_to_stop), std::move(processes_block), header));
}
break;
@ -286,7 +287,7 @@ BlockIO InterpreterKillQueryQuery::execute()
"Not allowed to kill mutation. To execute this query it's necessary to have the grant " + required_access_rights.toString(),
ErrorCodes::ACCESS_DENIED);
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(header.cloneWithColumns(std::move(res_columns)))));
break;
}
@ -302,10 +303,12 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
auto stream = executeQuery(select_query, getContext(), true).getInputStream();
Block res = stream->read();
auto io = executeQuery(select_query, getContext(), true);
PullingPipelineExecutor executor(io.pipeline);
Block res;
bool need_another_read = executor.pull(res);
if (res && stream->read())
if (res && need_another_read)
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
return res;

View File

@ -133,8 +133,10 @@ BlockIO InterpreterSelectIntersectExceptQuery::execute()
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);
pipeline->addInterpreterContext(context);
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context))));
return res;
}

View File

@ -603,8 +603,8 @@ BlockIO InterpreterSelectQuery::execute()
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)));
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context))));
return res;
}

View File

@ -320,13 +320,13 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline = query_plan.buildQueryPipeline(
auto pipeline_builder = query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);
pipeline_builder->addInterpreterContext(context);
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
return res;
}

View File

@ -5,7 +5,7 @@
#include <Interpreters/InterpreterShowCreateAccessEntityQuery.h>
#include <Interpreters/InterpreterShowGrantsQuery.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypeString.h>
#include <Access/AccessFlags.h>
#include <Access/AccessControlManager.h>
@ -22,12 +22,12 @@ using EntityType = IAccessEntity::Type;
BlockIO InterpreterShowAccessQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
BlockInputStreamPtr InterpreterShowAccessQuery::executeImpl() const
QueryPipeline InterpreterShowAccessQuery::executeImpl() const
{
/// Build a create query.
ASTs queries = getCreateAndGrantQueries();
@ -43,7 +43,7 @@ BlockInputStreamPtr InterpreterShowAccessQuery::executeImpl() const
}
String desc = "ACCESS";
return std::make_shared<OneBlockInputStream>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}});
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}}));
}

View File

@ -23,7 +23,7 @@ public:
bool ignoreLimits() const override { return true; }
private:
BlockInputStreamPtr executeImpl() const;
QueryPipeline executeImpl() const;
ASTs getCreateAndGrantQueries() const;
std::vector<AccessEntityPtr> getEntities() const;

View File

@ -20,7 +20,7 @@
#include <Access/Role.h>
#include <Access/SettingsProfile.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypeString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/Defines.h>
@ -241,12 +241,12 @@ InterpreterShowCreateAccessEntityQuery::InterpreterShowCreateAccessEntityQuery(c
BlockIO InterpreterShowCreateAccessEntityQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
BlockInputStreamPtr InterpreterShowCreateAccessEntityQuery::executeImpl()
QueryPipeline InterpreterShowCreateAccessEntityQuery::executeImpl()
{
/// Build a create queries.
ASTs create_queries = getCreateQueries();
@ -270,7 +270,7 @@ BlockInputStreamPtr InterpreterShowCreateAccessEntityQuery::executeImpl()
if (startsWith(desc, prefix))
desc = desc.substr(prefix.length()); /// `desc` always starts with "SHOW ", so we can trim this prefix.
return std::make_shared<OneBlockInputStream>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}});
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}}));
}

View File

@ -30,7 +30,7 @@ public:
static ASTPtr getAttachQuery(const IAccessEntity & entity);
private:
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
std::vector<AccessEntityPtr> getEntities() const;
ASTs getCreateQueries() const;
AccessRightsElements getRequiredAccess() const;

View File

@ -1,7 +1,7 @@
#include <Storages/IStorage.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/formatAST.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
@ -26,7 +26,7 @@ namespace ErrorCodes
BlockIO InterpreterShowCreateQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
@ -40,7 +40,7 @@ Block InterpreterShowCreateQuery::getSampleBlock()
}
BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
QueryPipeline InterpreterShowCreateQuery::executeImpl()
{
ASTPtr create_query;
ASTQueryWithTableAndOutput * show_query;
@ -100,10 +100,10 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
MutableColumnPtr column = ColumnString::create();
column->insert(res);
return std::make_shared<OneBlockInputStream>(Block{{
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(Block{{
std::move(column),
std::make_shared<DataTypeString>(),
"statement"}});
"statement"}}));
}
}

View File

@ -21,7 +21,7 @@ public:
private:
ASTPtr query_ptr;
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
};

View File

@ -5,7 +5,7 @@
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h>
#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypeString.h>
#include <Access/AccessControlManager.h>
#include <Access/User.h>
@ -100,12 +100,12 @@ namespace
BlockIO InterpreterShowGrantsQuery::execute()
{
BlockIO res;
res.in = executeImpl();
res.pipeline = executeImpl();
return res;
}
BlockInputStreamPtr InterpreterShowGrantsQuery::executeImpl()
QueryPipeline InterpreterShowGrantsQuery::executeImpl()
{
/// Build a create query.
ASTs grant_queries = getGrantQueries();
@ -129,7 +129,7 @@ BlockInputStreamPtr InterpreterShowGrantsQuery::executeImpl()
if (desc.starts_with(prefix))
desc = desc.substr(prefix.length()); /// `desc` always starts with "SHOW ", so we can trim this prefix.
return std::make_shared<OneBlockInputStream>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}});
return QueryPipeline(std::make_shared<SourceFromSingleChunk>(Block{{std::move(column), std::make_shared<DataTypeString>(), desc}}));
}

View File

@ -27,7 +27,7 @@ public:
bool ignoreLimits() const override { return true; }
private:
BlockInputStreamPtr executeImpl();
QueryPipeline executeImpl();
ASTs getGrantQueries() const;
std::vector<AccessEntityPtr> getEntities() const;

View File

@ -85,7 +85,7 @@ BlockIO InterpreterWatchQuery::execute()
pipe.setQuota(getContext()->getQuota());
}
res.pipeline.init(std::move(pipe));
res.pipeline = QueryPipeline(std::move(pipe));
return res;
}

View File

@ -332,17 +332,6 @@ bool QueryStatus::streamsAreReleased()
return query_streams_status == QueryStreamsStatus::Released;
}
bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in) const
{
std::lock_guard lock(query_streams_mutex);
if (query_streams_status != QueryStreamsStatus::Initialized)
return false;
in = query_stream_in;
return true;
}
CancellationCode QueryStatus::cancelQuery(bool kill)
{
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit

View File

@ -105,11 +105,6 @@ protected:
mutable std::mutex query_streams_mutex;
/// Streams with query results, point to BlockIO from executeQuery()
/// This declaration is compatible with notes about BlockIO::process_list_entry:
/// there are no cyclic dependencies: BlockIO::in,out point to objects inside ProcessListElement (not whole object)
BlockInputStreamPtr query_stream_in;
/// Array of PipelineExecutors to be cancelled when a cancelQuery is received
std::vector<PipelineExecutor *> executors;
@ -181,9 +176,6 @@ public:
/// It means that ProcessListEntry still exists, but stream was already destroyed
bool streamsAreReleased();
/// Get query in/out pointers from BlockIO
bool tryGetQueryStreams(BlockInputStreamPtr & in) const;
CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; }

View File

@ -489,7 +489,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.out);
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(block);

View File

@ -55,6 +55,7 @@
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <random>
@ -587,8 +588,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res = interpreter->execute();
}
QueryPipelineBuilder & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
QueryPipeline & pipeline = res.pipeline;
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
@ -604,54 +604,23 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if ((*process_list_entry)->isKilled())
throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
ErrorCodes::QUERY_WAS_CANCELLED);
else if (!use_processors)
(*process_list_entry)->setQueryStreams(res);
}
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;
if (use_processors)
if (pipeline.pulling())
{
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
pipeline.setProgressCallback(context->getProgressCallback());
pipeline.setProcessListElement(context->getProcessListElement());
if (stage == QueryProcessingStage::Complete && !pipeline.isCompleted())
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
{
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
transform->setQuota(quota);
return transform;
});
}
if (stage == QueryProcessingStage::Complete)
pipeline.setLimitsAndQuota(limits, quota);
}
else
else if (pipeline.pushing())
{
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (res.in)
{
res.in->setProgressCallback(context->getProgressCallback());
res.in->setProcessListElement(context->getProcessListElement());
if (stage == QueryProcessingStage::Complete)
{
if (!interpreter->ignoreQuota())
res.in->setQuota(quota);
if (!interpreter->ignoreLimits())
res.in->setLimits(limits);
}
}
if (!res.out.empty())
{
if (auto * counting = dynamic_cast<CountingTransform *>(&res.out.getSource()))
{
counting->setProcessListElement(context->getProcessListElement());
}
}
pipeline.setProcessListElement(context->getProcessListElement());
}
/// Everything related to query log.
@ -678,15 +647,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Log into system table start of query execution, if need.
if (log_queries)
{
if (use_processors)
{
const auto & info = context->getQueryAccessInfo();
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
const auto & info = context->getQueryAccessInfo();
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_projections = info.projections;
elem.query_views = info.views;
interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table);
@ -759,7 +725,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
status_info_to_query_log
]
(IBlockInputStream * stream_in, QueryPipelineBuilder * query_pipeline) mutable
(QueryPipeline & query_pipeline) mutable
{
QueryStatus * process_list_elem = context->getProcessListElement();
@ -787,21 +753,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (progress_callback)
progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));
if (stream_in)
else if (query_pipeline.pulling())
{
const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo();
/// NOTE: INSERT SELECT query contains zero metrics
elem.result_rows = stream_in_info.rows;
elem.result_bytes = stream_in_info.bytes;
}
else if (query_pipeline)
{
if (const auto * output_format = query_pipeline->getOutputFormat())
{
elem.result_rows = output_format->getResultRows();
elem.result_bytes = output_format->getResultBytes();
}
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
@ -966,12 +920,10 @@ BlockIO executeQuery(
bool may_have_embedded_data,
bool allow_processors)
{
BlockIO res = executeQuery(query, context, internal, stage, may_have_embedded_data);
if (!allow_processors)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Flag allow_processors is deprecated for executeQuery");
if (!allow_processors && res.pipeline.initialized())
res.in = res.getInputStream();
return res;
return executeQuery(query, context, internal, stage, may_have_embedded_data);
}
@ -1028,70 +980,12 @@ void executeQuery(
try
{
if (!streams.out.empty())
if (pipeline.pushing())
{
auto pipe = getSourceFromFromASTInsertQuery(ast, &istr, streams.out.getInputHeader(), context, nullptr);
pipeline.init(std::move(pipe));
pipeline.resize(1);
pipeline.addChain(std::move(streams.out));
pipeline.setSinks([&](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExceptionHandlingSink>(header);
});
auto executor = pipeline.execute();
executor->execute(pipeline.getNumThreads());
auto pipe = getSourceFromFromASTInsertQuery(ast, &istr, pipeline.getHeader(), context, nullptr);
pipeline.complete(std::move(pipe));
}
else if (streams.in)
{
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::unique_ptr<WriteBuffer> compressed_buffer;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, ""),
/* compression level = */ 3
);
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
streams.in->getHeader(),
context,
{},
output_format_settings);
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_result_details)
set_result_details(
context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); });
}
else if (pipeline.initialized())
else if (pipeline.pushing())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
@ -1111,55 +1005,45 @@ void executeQuery(
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (!pipeline.isCompleted())
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
pipeline.getHeader(),
context,
{},
output_format_settings);
out->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
pipeline.getHeader(),
context,
{},
output_format_settings);
out->setBeforeFinalizeCallback(before_finalize_callback);
out->setAutoFlush();
if (set_result_details)
set_result_details(
context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
out->setBeforeFinalizeCallback(before_finalize_callback);
if (set_result_details)
set_result_details(
context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
pipeline.setOutputFormat(std::move(out));
}
else
{
pipeline.setProgressCallback(context->getProgressCallback());
}
{
auto executor = pipeline.execute();
executor->execute(pipeline.getNumThreads());
}
pipeline.complete(std::move(out));
}
else
{
pipeline.setProgressCallback(context->getProgressCallback());
}
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
catch (...)
{
@ -1174,20 +1058,15 @@ void executeTrivialBlockIO(BlockIO & streams, ContextPtr context)
{
try
{
if (!streams.out.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires input, but no input buffer provided, it's a bug");
if (streams.in)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires output, but no output buffer provided, it's a bug");
if (!streams.pipeline.initialized())
return;
if (!streams.pipeline.isCompleted())
if (!streams.pipeline.completed())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query pipeline requires output, but no output buffer provided, it's a bug");
streams.pipeline.setProgressCallback(context->getProgressCallback());
auto executor = streams.pipeline.execute();
executor->execute(streams.pipeline.getNumThreads());
CompletedPipelineExecutor executor(streams.pipeline);
executor.execute();
}
catch (...)
{

View File

@ -0,0 +1,16 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
CompletedPipelineExecutor::CompletedPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) {}
void CompletedPipelineExecutor::execute()
{
PipelineExecutor executor(pipeline.processors);
executor.execute(pipeline.getNumThreads());
}
}

View File

@ -0,0 +1,19 @@
#pragma once
namespace DB
{
class QueryPipeline;
class CompletedPipelineExecutor
{
public:
explicit CompletedPipelineExecutor(QueryPipeline & pipeline_);
void execute();
private:
QueryPipeline & pipeline;
};
}

View File

@ -2,7 +2,8 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <common/scope_guard_safe.h>
@ -10,6 +11,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct PullingAsyncPipelineExecutor::Data
{
PipelineExecutorPtr executor;
@ -36,13 +42,13 @@ struct PullingAsyncPipelineExecutor::Data
}
};
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipelineBuilder & pipeline_) : pipeline(pipeline_)
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
if (!pipeline.isCompleted())
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutputFormat(lazy_format);
}
if (!pipeline.pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling");
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader());
pipeline.complete(lazy_format);
}
PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
@ -59,8 +65,7 @@ PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
const Block & PullingAsyncPipelineExecutor::getHeader() const
{
return lazy_format ? lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader()
: pipeline.getHeader(); /// Empty.
return pipeline.output->getHeader();
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
@ -99,7 +104,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (!data)
{
data = std::make_unique<Data>();
data->executor = pipeline.execute();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->lazy_format = lazy_format.get();
auto func = [&, thread_group = CurrentThread::getGroup()]()

View File

@ -4,7 +4,7 @@
namespace DB
{
class QueryPipelineBuilder;
class QueryPipeline;
class Block;
class Chunk;
class LazyOutputFormat;
@ -20,7 +20,7 @@ struct BlockStreamProfileInfo;
class PullingAsyncPipelineExecutor
{
public:
explicit PullingAsyncPipelineExecutor(QueryPipelineBuilder & pipeline_);
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_);
~PullingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.
@ -50,7 +50,7 @@ public:
struct Data;
private:
QueryPipelineBuilder & pipeline;
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;
std::unique_ptr<Data> data;
};

View File

@ -1,15 +1,25 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/PullingOutputFormat.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipelineBuilder & pipeline_) : pipeline(pipeline_)
namespace ErrorCodes
{
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);
pipeline.setOutputFormat(pulling_format);
extern const int LOGICAL_ERROR;
}
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
if (!pipeline.pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingPipelineExecutor must be pulling");
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.output->getHeader(), has_data_flag);
pipeline.complete(pulling_format);
}
PullingPipelineExecutor::~PullingPipelineExecutor()
@ -26,13 +36,13 @@ PullingPipelineExecutor::~PullingPipelineExecutor()
const Block & PullingPipelineExecutor::getHeader() const
{
return pulling_format->getPort(IOutputFormat::PortKind::Main).getHeader();
return pipeline.output->getHeader();
}
bool PullingPipelineExecutor::pull(Chunk & chunk)
{
if (!executor)
executor = pipeline.execute();
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
if (!executor->executeStep(&has_data_flag))
return false;

View File

@ -7,7 +7,7 @@ namespace DB
class Block;
class Chunk;
class QueryPipelineBuilder;
class QueryPipeline;
class PipelineExecutor;
class PullingOutputFormat;
struct BlockStreamProfileInfo;
@ -23,7 +23,7 @@ using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;
class PullingPipelineExecutor
{
public:
explicit PullingPipelineExecutor(QueryPipelineBuilder & pipeline_);
explicit PullingPipelineExecutor(QueryPipeline & pipeline_);
~PullingPipelineExecutor();
/// Get structure of returned block or chunk.
@ -50,7 +50,7 @@ public:
private:
std::atomic_bool has_data_flag = false;
QueryPipelineBuilder & pipeline;
QueryPipeline & pipeline;
std::shared_ptr<PullingOutputFormat> pulling_format;
PipelineExecutorPtr executor;
};

View File

@ -1,7 +1,7 @@
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/Chain.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <iostream>
@ -127,19 +127,14 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
}
PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(Chain & chain_) : chain(chain_)
PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
pushing_source = std::make_shared<PushingAsyncSource>(chain.getInputHeader());
auto sink = std::make_shared<ExceptionHandlingSink>(chain.getOutputHeader());
connect(pushing_source->getPort(), chain.getInputPort());
connect(chain.getOutputPort(), sink->getPort());
if (!pipeline.pushing())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing");
processors = std::make_unique<Processors>();
processors->reserve(chain.getProcessors().size() + 2);
for (const auto & processor : chain.getProcessors())
processors->push_back(processor);
processors->push_back(pushing_source);
processors->push_back(std::move(sink));
pushing_source = std::make_shared<PushingAsyncSource>(pipeline.input->getHeader());
connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source);
}
PushingAsyncPipelineExecutor::~PushingAsyncPipelineExecutor()
@ -168,12 +163,12 @@ void PushingAsyncPipelineExecutor::start()
started = true;
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(*processors);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors);
data->source = pushing_source.get();
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, chain.getNumThreads());
threadFunction(*data, thread_group, pipeline.getNumThreads());
};
data->thread = ThreadFromGlobalPool(std::move(func));

View File

@ -7,7 +7,7 @@ namespace DB
class Block;
class Chunk;
class Chain;
class QueryPipeline;
class PushingAsyncSource;
class PipelineExecutor;
@ -28,7 +28,7 @@ using Processors = std::vector<ProcessorPtr>;
class PushingAsyncPipelineExecutor
{
public:
explicit PushingAsyncPipelineExecutor(Chain & chain);
explicit PushingAsyncPipelineExecutor(QueryPipeline & pipeline_);
~PushingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.
@ -47,10 +47,9 @@ public:
struct Data;
private:
Chain & chain;
QueryPipeline & pipeline;
std::shared_ptr<PushingAsyncSource> pushing_source;
std::unique_ptr<Processors> processors;
bool started = false;
bool finished = false;

View File

@ -1,8 +1,7 @@
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/Chain.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/QueryPipeline.h>
#include <iostream>
@ -52,19 +51,14 @@ private:
};
PushingPipelineExecutor::PushingPipelineExecutor(Chain & chain_) : chain(chain_)
PushingPipelineExecutor::PushingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
pushing_source = std::make_shared<PushingSource>(chain.getInputHeader(), need_data_flag);
auto sink = std::make_shared<ExceptionHandlingSink>(chain.getOutputHeader());
connect(pushing_source->getPort(), chain.getInputPort());
connect(chain.getOutputPort(), sink->getPort());
if (!pipeline.pushing())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing");
processors = std::make_unique<Processors>();
processors->reserve(chain.getProcessors().size() + 2);
for (const auto & processor : chain.getProcessors())
processors->push_back(processor);
processors->push_back(pushing_source);
processors->push_back(std::move(sink));
pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), need_data_flag);
connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source);
}
PushingPipelineExecutor::~PushingPipelineExecutor()
@ -91,7 +85,7 @@ void PushingPipelineExecutor::start()
return;
started = true;
executor = std::make_shared<PipelineExecutor>(*processors);
executor = std::make_shared<PipelineExecutor>(pipeline.processors);
if (!executor->executeStep(&need_data_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR,

View File

@ -7,7 +7,7 @@ namespace DB
class Block;
class Chunk;
class Chain;
class QueryPipeline;
class PushingSource;
class PipelineExecutor;
@ -28,7 +28,7 @@ using Processors = std::vector<ProcessorPtr>;
class PushingPipelineExecutor
{
public:
explicit PushingPipelineExecutor(Chain & chain);
explicit PushingPipelineExecutor(QueryPipeline & pipeline_);
~PushingPipelineExecutor();
/// Get structure of returned block or chunk.
@ -45,11 +45,10 @@ public:
void cancel();
private:
Chain & chain;
QueryPipeline & pipeline;
std::atomic_bool need_data_flag = false;
std::shared_ptr<PushingSource> pushing_source;
std::unique_ptr<Processors> processors;
PipelineExecutorPtr executor;
bool started = false;
bool finished = false;

View File

@ -82,6 +82,8 @@ public:
virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); }
virtual bool expectMaterializedColumns() const { return true; }
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }

View File

@ -44,6 +44,8 @@ public:
queue.emplace(Chunk());
}
bool expectMaterializedColumns() const override { return false; }
protected:
void consume(Chunk chunk) override
{

View File

@ -24,6 +24,8 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
bool expectMaterializedColumns() const override { return false; }
protected:
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }

View File

@ -144,6 +144,7 @@ private:
void setOutputFormat(ProcessorPtr output);
friend class QueryPipelineBuilder;
friend class QueryPipeline;
};
}

View File

@ -0,0 +1,313 @@
#include <Processors/QueryPipeline.h>
#include <Processors/IProcessor.h>
#include <Processors/Pipe.h>
#include <Processors/Chain.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
namespace DB
{
QueryPipeline::QueryPipeline() = default;
QueryPipeline::QueryPipeline(QueryPipeline &&) = default;
QueryPipeline & QueryPipeline::operator=(QueryPipeline &&) = default;
QueryPipeline::~QueryPipeline() = default;
static void checkInput(const InputPort & input, const ProcessorPtr & processor)
{
if (!input.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected input",
processor->getName());
}
static void checkOutput(const OutputPort & output, const ProcessorPtr & processor)
{
if (!output.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected output",
processor->getName());
}
QueryPipeline::QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_)
: resources(std::move(resources_))
, processors(std::move(processors_))
{
for (const auto & processor : processors)
{
for (const auto & in : processor->getInputs())
checkInput(in, processor);
for (const auto & out : processor->getOutputs())
checkOutput(out, processor);
}
}
QueryPipeline::QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_,
InputPort * input_)
: resources(std::move(resources_))
, processors(std::move(processors_))
, input(input_)
{
if (!input || input->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pushing QueryPipeline because its input port is connected or null");
bool found_input = false;
for (const auto & processor : processors)
{
for (const auto & in : processor->getInputs())
{
if (&in == input)
found_input = true;
else
checkInput(in, processor);
}
for (const auto & out : processor->getOutputs())
checkOutput(out, processor);
}
if (!found_input)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pushing QueryPipeline because its input port does not belong to any processor");
}
QueryPipeline::QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_,
OutputPort * output_,
OutputPort * totals_,
OutputPort * extremes_)
: resources(std::move(resources_))
, processors(std::move(processors_))
, output(output_)
, totals(totals_)
, extremes(extremes_)
{
if (!output || output->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its output port is connected or null");
if (totals && totals->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its totals port is connected");
if (extremes || extremes->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its extremes port is connected");
bool found_output = false;
bool found_totals = false;
bool found_extremes = false;
for (const auto & processor : processors)
{
for (const auto & in : processor->getInputs())
checkInput(in, processor);
for (const auto & out : processor->getOutputs())
{
if (&out == output)
found_output = true;
else if (totals && &out == totals)
found_totals = true;
else if (extremes && &out == extremes)
found_extremes = true;
else
checkOutput(out, processor);
}
}
if (!found_output)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its output port does not belong to any processor");
if (totals && !found_totals)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its totals port does not belong to any processor");
if (extremes && !found_extremes)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its extremes port does not belong to any processor");
}
QueryPipeline::QueryPipeline(std::shared_ptr<ISource> source) : QueryPipeline(Pipe(std::move(source))) {}
QueryPipeline::QueryPipeline(Pipe pipe)
: QueryPipeline(std::move(pipe.holder), std::move(pipe.processors), pipe.getOutputPort(0), pipe.getTotalsPort(), pipe.getExtremesPort())
{
}
QueryPipeline::QueryPipeline(Chain chain)
: resources(chain.detachResources())
, input(&chain.getInputPort())
, num_threads(chain.getNumThreads())
{
processors.reserve(chain.getProcessors().size() + 1);
for (auto processor : chain.getProcessors())
processors.emplace_back(std::move(processor));
auto sink = std::make_shared<ExceptionHandlingSink>(chain.getOutputPort().getHeader());
connect(chain.getOutputPort(), sink->getPort());
processors.emplace_back(std::move(sink));
input = &chain.getInputPort();
}
QueryPipeline::QueryPipeline(std::shared_ptr<SinkToStorage> sink) : QueryPipeline(Chain(std::move(sink))) {}
void QueryPipeline::complete(Pipe pipe)
{
if (!pushing())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pushing to be completed with pipe");
pipe.resize(1);
resources = pipe.detachResources();
pipe.dropExtremes();
pipe.dropTotals();
connect(*pipe.getOutputPort(0), *input);
input = nullptr;
auto pipe_processors = Pipe::detachProcessors(std::move(pipe));
processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end());
}
void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
{
if (!pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with output format");
if (format->expectMaterializedColumns())
{
auto materializing = std::make_shared<MaterializingTransform>(output->getHeader());
connect(*output, materializing->getInputPort());
output = &materializing->getOutputPort();
processors.emplace_back(std::move(output));
}
auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
if (!totals)
{
auto source = std::make_shared<NullSource>(totals->getHeader());
totals = &source->getPort();
processors.emplace_back(std::move(source));
}
if (!extremes)
{
auto source = std::make_shared<NullSource>(extremes->getHeader());
extremes = &source->getPort();
processors.emplace_back(std::move(source));
}
processors.emplace_back(std::move(format));
connect(*output, format_main);
connect(*totals, format_totals);
connect(*extremes, format_extremes);
output = nullptr;
totals = nullptr;
extremes = nullptr;
}
Block QueryPipeline::getHeader() const
{
if (input)
return input->getHeader();
else if (output)
return output->getHeader();
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Header is available only for pushing or pulling QueryPipeline");
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
for (auto & processor : processors)
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
source->setProgressCallback(callback);
}
}
void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
process_list_element = elem;
if (pulling())
{
for (auto & processor : processors)
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
source->setProcessListElement(elem);
}
}
else if (pushing())
{
if (auto * counting = dynamic_cast<CountingTransform *>(&input->getOutputPort().getProcessor()))
{
counting->setProcessListElement(elem);
}
}
}
void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::shared_ptr<const EnabledQuota> quota)
{
if (!pulling())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"It is possible to set limits and quota only to pullint QueryPipeline");
auto transform = std::make_shared<LimitsCheckingTransform>(output->getHeader(), limits);
transform->setQuota(quota);
connect(*output, transform->getInputPort());
output = &transform->getOutputPort();
processors.emplace_back(std::move(transform));
}
bool QueryPipeline::tryGetResultRowsAndBytes(size_t & result_rows, size_t & result_bytes) const
{
if (!output || !output->isConnected())
return false;
const auto * format = typeid_cast<const IOutputFormat *>(&output->getInputPort().getProcessor());
if (!format)
return false;
result_rows = format->getResultRows();
result_bytes = format->getResultBytes();
return true;
}
void QueryPipeline::reset()
{
QueryPipeline to_remove = std::move(*this);
*this = QueryPipeline();
}
}

View File

@ -0,0 +1,117 @@
#pragma once
#include <Processors/PipelineResourcesHolder.h>
namespace DB
{
class InputPort;
class OutputPort;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
using Processors = std::vector<ProcessorPtr>;
class QueryStatus;
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
struct StreamLocalLimits;
class EnabledQuota;
class Block;
class Pipe;
class Chain;
class IOutputFormat;
class SinkToStorage;
class ISource;
class QueryPipeline
{
public:
QueryPipeline();
QueryPipeline(QueryPipeline &&);
QueryPipeline(const QueryPipeline &) = delete;
QueryPipeline & operator=(QueryPipeline &&);
QueryPipeline & operator=(const QueryPipeline &) = delete;
~QueryPipeline();
/// pulling
explicit QueryPipeline(Pipe pipe);
explicit QueryPipeline(std::shared_ptr<ISource> source);
/// pushing
explicit QueryPipeline(Chain chain);
explicit QueryPipeline(std::shared_ptr<SinkToStorage> sink);
/// completed
QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_);
/// pushing
QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_,
InputPort * input_);
/// pulling
QueryPipeline(
PipelineResourcesHolder resources_,
Processors processors_,
OutputPort * output_,
OutputPort * totals_ = nullptr,
OutputPort * extremes_ = nullptr);
/// Exactly one of the following is true.
bool initialized() const { return !processors.empty(); }
/// Use PullingPipelineExecutor or PullingAsyncPipelineExecutor.
bool pulling() const { return output != nullptr; }
/// Use PushingPipelineExecutor or PushingAsyncPipelineExecutor.
bool pushing() const { return input != nullptr; }
/// Use PipelineExecutor. Call execute() to build one.
bool completed() const { return !pulling() && !pushing(); }
/// Only for pushing.
void complete(Pipe pipe);
/// Only for pulling.
void complete(std::shared_ptr<IOutputFormat> format);
/// Only for pushing and pulling.
Block getHeader() const;
size_t getNumThreads() const { return num_threads; }
void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }
void setProcessListElement(QueryStatus * elem);
void setProgressCallback(const ProgressCallback & callback);
void setLimitsAndQuota(const StreamLocalLimits & limits, std::shared_ptr<const EnabledQuota> quota);
bool tryGetResultRowsAndBytes(size_t & result_rows, size_t & result_bytes) const;
const Processors & getProcessors() const { return processors; }
void reset();
private:
PipelineResourcesHolder resources;
Processors processors;
InputPort * input = nullptr;
OutputPort * output = nullptr;
OutputPort * totals = nullptr;
OutputPort * extremes = nullptr;
QueryStatus * process_list_element = nullptr;
size_t num_threads = 0;
friend class PushingPipelineExecutor;
friend class PullingPipelineExecutor;
friend class PushingAsyncPipelineExecutor;
friend class PullingAsyncPipelineExecutor;
friend class CompletedPipelineExecutor;
};
}

View File

@ -538,6 +538,13 @@ PipelineExecutorPtr QueryPipelineBuilder::execute()
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
}
QueryPipeline QueryPipelineBuilder::getPipeline(QueryPipelineBuilder builder)
{
QueryPipeline res(std::move(builder.pipe));
res.setNumThreads(builder.getNumThreads());
return res;
}
void QueryPipelineBuilder::setCollectedProcessors(Processors * processors)
{
pipe.collected_processors = processors;

View File

@ -4,6 +4,7 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/IProcessor.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPipeline.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
@ -155,6 +156,7 @@ public:
/// Convert query pipeline to pipe.
static Pipe getPipe(QueryPipelineBuilder pipeline) { return std::move(pipeline.pipe); }
static QueryPipeline getPipeline(QueryPipelineBuilder builder);
private:

View File

@ -38,6 +38,7 @@
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include "Core/Protocol.h"
@ -295,7 +296,7 @@ void TCPHandler::runImpl()
after_check_cancelled.restart();
after_send_progress.restart();
if (!state.io.out.empty())
if (state.io.pipeline.pushing())
{
state.need_receive_data_for_insert = true;
processInsertQuery();
@ -303,13 +304,13 @@ void TCPHandler::runImpl()
else if (state.need_receive_data_for_input) // It implies pipeline execution
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
CompletedPipelineExecutor executor(state.io.pipeline);
executor.execute();
}
else if (state.io.pipeline.initialized())
else if (state.io.pipeline.pulling())
processOrdinaryQueryWithProcessors();
else if (state.io.in)
processOrdinaryQuery();
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected QueryPipeline state.");
state.io.onFinish();
@ -544,7 +545,7 @@ void TCPHandler::skipData()
void TCPHandler::processInsertQuery()
{
size_t num_threads = state.io.out.getNumThreads();
size_t num_threads = state.io.pipeline.getNumThreads();
auto send_table_columns = [&]()
{
@ -565,7 +566,7 @@ void TCPHandler::processInsertQuery()
if (num_threads > 1)
{
PushingAsyncPipelineExecutor executor(state.io.out);
PushingAsyncPipelineExecutor executor(state.io.pipeline);
/** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
* client receive exception before sending data.
*/
@ -585,7 +586,7 @@ void TCPHandler::processInsertQuery()
}
else
{
PushingPipelineExecutor executor(state.io.out);
PushingPipelineExecutor executor(state.io.pipeline);
executor.start();
send_table_columns();
@ -602,77 +603,6 @@ void TCPHandler::processInsertQuery()
}
void TCPHandler::processOrdinaryQuery()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
sendPartUUIDs();
/// This allows the client to prepare output format
if (Block header = state.io.in->getHeader())
sendData(header);
/// Use of async mode here enables reporting progress and monitoring client cancelling the query
AsynchronousBlockInputStream async_in(state.io.in);
async_in.readPrefix();
while (true)
{
if (isQueryCancelled())
{
async_in.cancel(false);
break;
}
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
{
/// Some time passed.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if (async_in.poll(interactive_delay / 1000))
{
const auto block = async_in.read();
if (!block)
break;
if (!state.io.null_format)
sendData(block);
}
}
async_in.readSuffix();
/** When the data has run out, we send the profiling data and totals up to the terminating empty block,
* so that this information can be used in the suffix output of stream.
* If the request has been interrupted, then sendTotals and other methods should not be called,
* because we have not read all the data.
*/
if (!isQueryCancelled())
{
sendTotals(state.io.in->getTotals());
sendExtremes(state.io.in->getExtremes());
sendProfileInfo(state.io.in->getProfileInfo());
sendProgress();
}
if (state.is_connection_closed)
return;
sendData({});
}
sendProgress();
}
void TCPHandler::processOrdinaryQueryWithProcessors()
{
auto & pipeline = state.io.pipeline;

View File

@ -617,7 +617,7 @@ bool StorageKafka::streamToViews()
streams.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out.getInputHeader().getNames(), log, block_size, false);
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.pipeline.getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
@ -642,7 +642,7 @@ bool StorageKafka::streamToViews()
// It will be cancelled on underlying layer (kafka buffer)
size_t rows = 0;
PushingPipelineExecutor executor(block_io.out);
PushingPipelineExecutor executor(block_io.pipeline);
in->readPrefix();
executor.start();

View File

@ -10,6 +10,7 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
namespace DB
{
@ -485,21 +486,15 @@ void MaterializedPostgreSQLConsumer::syncTables()
insert->columns = buffer.columnsAST;
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute();
auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));
assertBlocksHaveEqualStructure(input->getPort().getHeader(), block_io.out.getInputHeader(), "postgresql replica table sync");
QueryPipelineBuilder pipeline;
pipeline.init(Pipe(std::move(input)));
pipeline.addChain(std::move(block_io.out));
pipeline.setSinks([&](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExceptionHandlingSink>(header);
});
assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input)));
auto executor = pipeline.execute();
executor->execute(1);
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
}

View File

@ -1,8 +1,8 @@
#include "PostgreSQLReplicationHandler.h"
#include <DataStreams/PostgreSQLSource.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Interpreters/InterpreterDropQuery.h>
@ -239,29 +239,21 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name));
auto nested_storage = materialized_storage->getNested();
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = nested_storage->getStorageID();
auto insert_context = materialized_storage->getNestedTableContext();
InterpreterInsertQuery interpreter(insert, insert_context);
auto block_io = interpreter.execute();
InterpreterInsertQuery interpreter(nullptr, insert_context);
auto chain = interpreter.buildChain(nested_storage, nested_storage->getInMemoryMetadataPtr(), {});
const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata();
auto sample_block = storage_metadata.getSampleBlockNonMaterialized();
auto input = std::make_unique<PostgreSQLTransactionSource<pqxx::ReplicationTransaction>>(tx, query_str, sample_block, DEFAULT_BLOCK_SIZE);
QueryPipelineBuilder pipeline;
pipeline.init(Pipe(std::move(input)));
assertBlocksHaveEqualStructure(pipeline.getHeader(), block_io.out.getInputHeader(), "postgresql replica load from snapshot");
pipeline.addChain(std::move(block_io.out));
pipeline.setSinks([&](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExceptionHandlingSink>(header);
});
assertBlocksHaveEqualStructure(input->getPort().getHeader(), chain.getInputHeader(), "postgresql replica load from snapshot");
QueryPipeline pipeline(std::move(chain));
pipeline.complete(Pipe(std::move(input)));
auto executor = pipeline.execute();
executor->execute(1);
CompletedPipelineExecutor executor(pipeline);
executor.execute();
nested_storage = materialized_storage->prepare();
auto nested_table_id = nested_storage->getStorageID();

View File

@ -919,11 +919,11 @@ bool StorageRabbitMQ::streamToViews()
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
InterpreterInsertQuery interpreter(nullptr, rabbitmq_context, false, true, true);
auto chain = interpreter.buildChain(table, table->getInMemoryMetadataPtr(), {});
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out.getInputHeader().getNames();
auto column_names = chain.getInputHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto block_size = getMaxBlockSize();
@ -963,7 +963,8 @@ bool StorageRabbitMQ::streamToViews()
looping_task->activateAndSchedule();
}
PushingPipelineExecutor executor(block_io.out);
QueryPipeline pipeline(std::move(chain));
PushingPipelineExecutor executor(pipeline);
executor.start();
in->readPrefix();
while (auto block = in->read())