Use QueryPlan for SubqueryForSet.

This commit is contained in:
Nikolai Kochetov 2020-09-15 16:25:14 +03:00
parent 1ba67ea8a1
commit 4c783f19ee
10 changed files with 150 additions and 35 deletions

View File

@ -900,10 +900,11 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
* in the subquery_for_set object, this subquery is set as source and the temporary table _data1 as the table.
* - this function shows the expression IN_data1.
*/
if (subquery_for_set.source.empty() && data.no_storage_or_local)
if (!subquery_for_set.source && data.no_storage_or_local)
{
auto interpreter = interpretSubquery(right_in_operand, data.context, data.subquery_depth, {});
subquery_for_set.source = QueryPipeline::getPipe(interpreter->execute().pipeline);
subquery_for_set.source = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*subquery_for_set.source);
}
subquery_for_set.set = set;

View File

@ -582,7 +582,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(context, analyzedJoin());
Names original_right_columns;
if (subquery_for_join.source.empty())
if (!subquery_for_join.source)
{
NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns(
joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());

View File

@ -135,7 +135,8 @@ public:
ast = database_and_table_name;
external_tables[external_table_name] = external_storage_holder;
subqueries_for_sets[external_table_name].source = QueryPipeline::getPipe(interpreter->execute().pipeline);
subqueries_for_sets[external_table_name].source = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*subqueries_for_sets[external_table_name].source);
subqueries_for_sets[external_table_name].table = external_storage;
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,

View File

@ -1862,14 +1862,38 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_p
const Settings & settings = context->getSettingsRef();
auto creating_sets = std::make_unique<CreatingSetsStep>(
query_plan.getCurrentDataStream(),
std::move(subqueries_for_sets),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
*context);
if (subqueries_for_sets.empty())
return;
creating_sets->setStepDescription("Create sets for subqueries and joins");
query_plan.addStep(std::move(creating_sets));
SizeLimits limits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
std::vector<QueryPlan> plans;
DataStreams input_streams;
input_streams.emplace_back(query_plan.getCurrentDataStream());
for (auto & [description, set] : subqueries_for_sets)
{
auto plan = std::move(set.source);
std::string type = (set.join != nullptr) ? "JOIN"
: "subquery";
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
query_plan.getCurrentDataStream().header,
std::move(description),
std::move(set),
limits,
*context);
creating_set->setStepDescription("Create set for " + type);
plan->addStep(std::move(creating_set));
input_streams.emplace_back(plan->getCurrentDataStream());
plans.emplace_back(std::move(*plan));
}
auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
creating_sets->setStepDescription("Create sets before main query execution");
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
}

View File

@ -12,9 +12,10 @@ void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery>
NamesWithAliases && joined_block_aliases_)
{
joined_block_aliases = std::move(joined_block_aliases_);
source = QueryPipeline::getPipe(interpreter->execute().pipeline);
source = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*source);
sample_block = source.getHeader();
sample_block = interpreter->getSampleBlock();
renameColumns(sample_block);
}

View File

@ -5,7 +5,6 @@
#include <Parsers/IAST.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/PreparedSets.h>
#include <Processors/Pipe.h>
namespace DB
@ -14,12 +13,13 @@ namespace DB
class InterpreterSelectWithUnionQuery;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class QueryPlan;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
Pipe source;
std::unique_ptr<QueryPlan> source;
/// If set, build it from result.
SetPtr set;

View File

@ -204,7 +204,7 @@ void QueryPipeline::addCreatingSetsTransform(SubqueriesForSets subqueries_for_se
for (auto & subquery : subqueries_for_sets)
{
if (!subquery.second.source.empty())
if (subquery.second.source)
{
auto & source = sources.emplace_back(std::move(subquery.second.source));
if (source.numOutputPorts() > 1)
@ -315,6 +315,20 @@ QueryPipeline QueryPipeline::unitePipelines(
return pipeline;
}
void QueryPipeline::addDelayedPipeline(QueryPipeline pipeline)
{
pipeline.resize(1);
auto * collected_processors = pipe.collected_processors;
Pipes pipes;
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipes.emplace_back(std::move(pipe));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe.addTransform(std::make_shared<ConcatProcessor>(getHeader(), 2));
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
for (auto & processor : pipe.processors)

View File

@ -87,6 +87,8 @@ public:
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
void addDelayedPipeline(QueryPipeline);
PipelineExecutorPtr execute();
size_t getNumStreams() const { return pipe.numOutputPorts(); }

View File

@ -22,37 +22,91 @@ static ITransformingStep::Traits getTraits()
};
}
CreatingSetsStep::CreatingSetsStep(
CreatingSetStep::CreatingSetStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
const Context & context_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, subqueries_for_sets(std::move(subqueries_for_sets_))
: ITransformingStep(input_stream_, header, getTraits())
, description(std::move(description_))
, subquery_for_set(std::move(subquery_for_set_))
, network_transfer_limits(std::move(network_transfer_limits_))
, context(context_)
{
}
void CreatingSetsStep::transformPipeline(QueryPipeline & pipeline)
void CreatingSetStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addCreatingSetsTransform(std::move(subqueries_for_sets), network_transfer_limits, context);
pipeline.resize(1);
pipeline.addTransform(
std::make_shared<CreatingSetsTransform>(
pipeline.getHeader(),
getOutputStream().header,
std::move(subquery_for_set),
network_transfer_limits,
context));
}
void CreatingSetsStep::describeActions(FormatSettings & settings) const
void CreatingSetStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
for (const auto & set : subqueries_for_sets)
settings.out << prefix;
if (subquery_for_set.set)
settings.out << "Set: ";
else if (subquery_for_set.join)
settings.out << "Join: ";
settings.out << description << '\n';
}
CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_)
{
if (input_streams_.empty())
throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR);
input_streams = std::move(input_streams_);
output_stream = input_streams.front();
for (size_t i = 1; i < input_streams.size(); ++i)
assertBlocksHaveEqualStructure(output_stream->header, input_streams[i].header, "CreatingSets");
}
QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines)
{
if (pipelines.empty())
throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR);
auto main_pipeline = std::move(pipelines.front());
if (pipelines.size() == 1)
return main_pipeline;
std::swap(pipelines.front(), pipelines.back());
pipelines.pop_back();
QueryPipeline delayed_pipeline;
if (pipelines.size() > 1)
{
settings.out << prefix;
if (set.second.set)
settings.out << "Set: ";
else if (set.second.join)
settings.out << "Join: ";
settings.out << set.first << '\n';
QueryPipelineProcessorsCollector collector(delayed_pipeline, this);
delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header);
processors = collector.detachProcessors();
}
else
delayed_pipeline = std::move(*pipelines.front());
QueryPipelineProcessorsCollector collector(*main_pipeline, this);
main_pipeline->addDelayedPipeline(std::move(delayed_pipeline));
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
return main_pipeline;
}
void CreatingSetsStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
}

View File

@ -7,12 +7,14 @@ namespace DB
{
/// Creates sets for subqueries and JOIN. See CreatingSetsTransform.
class CreatingSetsStep : public ITransformingStep
class CreatingSetStep : public ITransformingStep
{
public:
CreatingSetsStep(
CreatingSetStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
Block header,
String description_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
const Context & context_);
@ -23,9 +25,25 @@ public:
void describeActions(FormatSettings & settings) const override;
private:
SubqueriesForSets subqueries_for_sets;
String description;
SubqueryForSet subquery_for_set;
SizeLimits network_transfer_limits;
const Context & context;
};
class CreatingSetsStep : public IQueryPlanStep
{
public:
CreatingSetsStep(DataStreams input_streams_);
String getName() const override { return "CreatingSets"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
void describePipeline(FormatSettings & settings) const override;
private:
Processors processors;
};
}