Extract converting from UnionStep.

This commit is contained in:
Nikolai Kochetov 2021-03-25 12:57:14 +03:00
parent 0afede382b
commit 0e2f52518f
11 changed files with 58 additions and 32 deletions

View File

@ -156,8 +156,7 @@ void executeQuery(
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto header = input_streams.front().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

View File

@ -250,7 +250,7 @@ BlockIO InterpreterInsertQuery::execute()
}
}
res.pipeline = QueryPipeline::unitePipelines(std::move(pipelines), {}, ExpressionActionsSettings::fromContext(context));
res.pipeline = QueryPipeline::unitePipelines(std::move(pipelines));
}
}

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -251,11 +252,23 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(plans[i]->getCurrentDataStream(), std::move(actions_dag));
converting_step->setStepDescription("Conversion before UNION");
plans[i]->addStep(std::move(converting_step));
}
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));

View File

@ -211,11 +211,14 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
QueryPipeline QueryPipeline::unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit,
Processors * collected_processors)
{
if (pipelines.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of pipelines");
Block common_header = pipelines.front()->getHeader();
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
/// If true, result max_threads will be sum(max_threads).
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
@ -229,19 +232,21 @@ QueryPipeline QueryPipeline::unitePipelines(
pipeline.checkInitialized();
pipeline.pipe.collected_processors = collected_processors;
if (!pipeline.isCompleted())
{
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
common_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, settings);
assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines");
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, actions);
});
}
// if (!pipeline.isCompleted())
// {
// auto actions_dag = ActionsDAG::makeConvertingActions(
// pipeline.getHeader().getColumnsWithTypeAndName(),
// common_header.getColumnsWithTypeAndName(),
// ActionsDAG::MatchColumnsMode::Position);
// auto actions = std::make_shared<ExpressionActions>(actions_dag, settings);
// pipeline.addSimpleTransform([&](const Block & header)
// {
// return std::make_shared<ExpressionTransform>(header, actions);
// });
// }
pipes.emplace_back(std::move(pipeline.pipe));

View File

@ -90,8 +90,6 @@ public:
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipeline unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
const Block & common_header,
const ExpressionActionsSettings & settings,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);

View File

@ -74,7 +74,7 @@ CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
input_streams[i].header.dumpStructure());
}
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.empty())
throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR);
@ -89,7 +89,7 @@ QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, cons
if (pipelines.size() > 1)
{
QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), Block(), settings.getActionsSettings());
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines));
processors = collector.detachProcessors();
}
else

View File

@ -37,7 +37,7 @@ public:
String getName() const override { return "CreatingSets"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;

View File

@ -6,8 +6,20 @@
namespace DB
{
UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_)
: header(std::move(result_header))
static Block checkHeaders(const DataStreams & input_streams)
{
if (input_streams.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps");
Block res = input_streams.front().header;
for (const auto & stream : input_streams)
assertBlocksHaveEqualStructure(stream.header, res, "UnionStep");
return res;
}
UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
: header(checkHeaders(input_streams_))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
@ -18,7 +30,7 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings)
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
auto pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
@ -30,7 +42,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const Build
return pipeline;
}
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings(), max_threads);
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
processors = collector.detachProcessors();
return pipeline;

View File

@ -9,11 +9,11 @@ class UnionStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_ = 0);
explicit UnionStep(DataStreams input_streams_, size_t max_threads_ = 0);
String getName() const override { return "Union"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;

View File

@ -1355,8 +1355,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
const auto & common_header = plans.front()->getCurrentDataStream().header;
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), common_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
auto plan = std::make_unique<QueryPlan>();
plan->unitePlans(std::move(union_step), std::move(plans));

View File

@ -391,7 +391,7 @@ void StorageBuffer::read(
plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
query_plan = QueryPlan();
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), result_header);
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
union_step->setStepDescription("Unite sources from Buffer table");
query_plan.unitePlans(std::move(union_step), std::move(plans));
}