Use QueryPlan in InterpreterSelectQuery [part 2].

This commit is contained in:
Nikolai Kochetov 2020-06-18 20:45:00 +03:00
parent e0841360db
commit 45adacf0bc
15 changed files with 342 additions and 173 deletions

View File

@ -77,6 +77,7 @@
#include <ext/map.h>
#include <ext/scope_guard.h>
#include <memory>
#include <Processors/QueryPlan/ConvertingStep.h>
namespace DB
@ -462,26 +463,29 @@ Block InterpreterSelectQuery::getSampleBlock()
return result_header;
}
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
executeImpl(query_plan, input, std::move(input_pipe));
/// We must guarantee that result structure is the same as in getSampleBlock()
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), result_header);
query_plan.addStep(std::move(converting));
}
}
BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
QueryPlan query_plan;
executeImpl(query_plan, input, std::move(input_pipe));
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline());
res.pipeline.addInterpreterContext(context);
res.pipeline.addStorageHolder(storage);
/// We must guarantee that result structure is the same as in getSampleBlock()
if (!blocksHaveEqualStructure(res.pipeline.getHeader(), result_header))
{
res.pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, result_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
return res;
}
@ -1045,9 +1049,9 @@ void InterpreterSelectQuery::executeFetchColumns(
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, argument_types, desc.parameters), desc.column_name}};
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
ReadFromPreparedSource prepared_count(Pipe(std::make_shared<SourceFromInputStream>(istream)));
prepared_count.setStepDescription("Optimized trivial count");
prepared_count.initializePipeline(pipeline);
auto prepared_count = std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<SourceFromInputStream>(istream)));
prepared_count->setStepDescription("Optimized trivial count");
query_plan.addStep(std::move(prepared_count));
from_stage = QueryProcessingStage::WithMergeableState;
analysis_result.first_stage = false;
return;
@ -1241,7 +1245,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
is_remote = true;
max_streams = settings.max_distributed_connections;
pipeline.setMaxThreads(max_streams);
query_plan.setMaxThreads(max_streams);
}
UInt64 max_block_size = settings.max_block_size;
@ -1266,14 +1270,14 @@ void InterpreterSelectQuery::executeFetchColumns(
{
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
max_streams = 1;
pipeline.setMaxThreads(max_streams);
query_plan.setMaxThreads(max_streams);
}
if (!max_block_size)
throw Exception("Setting 'max_block_size' cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if (pipeline.initialized())
if (query_plan.isInitialized())
{
/// Prepared input.
}
@ -1295,7 +1299,7 @@ void InterpreterSelectQuery::executeFetchColumns(
interpreter_subquery->ignoreWithTotals();
}
pipeline = interpreter_subquery->execute().pipeline;
interpreter_subquery->buildQueryPlan(query_plan);
}
else if (storage)
{
@ -1331,13 +1335,12 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage);
}
ReadFromStorageStep read_step(
auto read_step = std::make_unique<ReadFromStorageStep>(
table_lock, options, storage,
required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
read_step.setStepDescription("Read from " + storage->getName());
pipeline = std::move(*read_step.updatePipeline({}));
read_step->setStepDescription("Read from " + storage->getName());
query_plan.addStep(std::move(read_step));
}
else
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
@ -1345,32 +1348,33 @@ void InterpreterSelectQuery::executeFetchColumns(
/// Aliases in table declaration.
if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions)
{
ExpressionStep table_aliases(DataStream{.header = pipeline.getHeader()}, alias_actions);
table_aliases.setStepDescription("Add table aliases");
table_aliases.transformPipeline(pipeline);
auto table_aliases = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), alias_actions);
table_aliases->setStepDescription("Add table aliases");
query_plan.addStep(std::move(table_aliases));
}
}
void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter)
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool remove_filter)
{
FilterStep where_step(
DataStream{.header = pipeline.getHeader()},
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expression,
getSelectQuery().where()->getColumnName(),
remove_filter);
where_step.setStepDescription("WHERE");
where_step.transformPipeline(pipeline);
where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
}
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
ExpressionStep expression_before_aggregation(DataStream{.header = pipeline.getHeader()}, expression);
expression_before_aggregation.transformPipeline(pipeline);
auto expression_before_aggregation = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
Block header_before_aggregation = pipeline.getHeader();
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_aggregation.getPositionByName(key.name));
@ -1412,8 +1416,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
AggregatingStep aggregating_step(
DataStream{.header = pipeline.getHeader()},
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.max_block_size,
merge_threads,
@ -1422,13 +1426,13 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
std::move(group_by_info),
std::move(group_by_sort_description));
aggregating_step.transformPipeline(pipeline);
query_plan.addStep(std::move(aggregating_step));
}
void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final)
void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final)
{
Block header_before_merge = pipeline.getHeader();
const auto & header_before_merge = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
@ -1455,47 +1459,45 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
MergingAggregatedStep merging_aggregated(
DataStream{.header = pipeline.getHeader()},
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.distributed_aggregation_memory_efficient,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
merging_aggregated.transformPipeline(pipeline);
query_plan.addStep(std::move(merging_aggregated));
}
void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const ExpressionActionsPtr & expression)
{
FilterStep having_step(
DataStream{.header = pipeline.getHeader()},
auto having_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expression, getSelectQuery().having()->getColumnName(), false);
having_step.setStepDescription("HAVING");
having_step.transformPipeline(pipeline);
having_step->setStepDescription("HAVING");
query_plan.addStep(std::move(having_step));
}
void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
{
const Settings & settings = context->getSettingsRef();
TotalsHavingStep totals_having_step(
DataStream{.header = pipeline.getHeader()},
auto totals_having_step = std::make_unique<TotalsHavingStep>(
query_plan.getCurrentDataStream(),
overflow_row, expression,
has_having ? getSelectQuery().having()->getColumnName() : "",
settings.totals_mode, settings.totals_auto_threshold, final);
totals_having_step.transformPipeline(pipeline);
query_plan.addStep(std::move(totals_having_step));
}
void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator)
void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator)
{
pipeline.resize(1);
Block header_before_transform = pipeline.getHeader();
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
@ -1512,45 +1514,40 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
QueryPlanStepPtr step;
if (modificator == Modificator::ROLLUP)
{
RollupStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params));
rollup_step.transformPipeline(pipeline);
}
step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
else
{
CubeStep cube_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params));
cube_step.transformPipeline(pipeline);
}
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
query_plan.addStep(std::move(step));
}
void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, const std::string & description)
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ExpressionActionsPtr & expression, const std::string & description)
{
ExpressionStep expression_step(
DataStream{.header = pipeline.getHeader()},
expression);
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
expression_step.setStepDescription(description);
expression_step.transformPipeline(pipeline);
expression_step->setStepDescription(description);
query_plan.addStep(std::move(expression_step));
}
void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
{
const Settings & settings = context->getSettingsRef();
FinishSortingStep finish_sorting_step(
DataStream{.header = pipeline.getHeader()},
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit);
finish_sorting_step.transformPipeline(pipeline);
query_plan.addStep(std::move(finish_sorting_step));
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info)
void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfoPtr input_sorting_info)
{
auto & query = getSelectQuery();
SortDescription output_order_descr = getSortDescription(query, *context);
@ -1564,69 +1561,69 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderIn
* and then merge them into one sorted stream.
* At this stage we merge per-thread streams into one.
*/
executeOrderOptimized(pipeline, input_sorting_info, limit, output_order_descr);
executeOrderOptimized(query_plan, input_sorting_info, limit, output_order_descr);
return;
}
const Settings & settings = context->getSettingsRef();
PartialSortingStep partial_sorting(
DataStream{.header = pipeline.getHeader()},
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr,
limit,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
partial_sorting.setStepDescription("Sort each block before ORDER BY");
partial_sorting.transformPipeline(pipeline);
partial_sorting->setStepDescription("Sort each block before ORDER BY");
query_plan.addStep(std::move(partial_sorting));
/// Merge the sorted blocks.
MergeSortingStep merge_sorting_step(
DataStream{.header = pipeline.getHeader()},
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step.setStepDescription("Merge sorted blocks before ORDER BY");
merge_sorting_step.transformPipeline(pipeline);
merge_sorting_step->setStepDescription("Merge sorted blocks before ORDER BY");
query_plan.addStep(std::move(merge_sorting_step));
/// If there are several streams, we merge them into one
executeMergeSorted(pipeline, output_order_descr, limit, "before ORDER BY");
executeMergeSorted(query_plan, output_order_descr, limit, "before ORDER BY");
}
void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const std::string & description)
void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const std::string & description)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query, *context);
UInt64 limit = getLimitForSorting(query, *context);
executeMergeSorted(pipeline, order_descr, limit, description);
executeMergeSorted(query_plan, order_descr, limit, description);
}
void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit, const std::string & description)
void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const SortDescription & sort_description, UInt64 limit, const std::string & description)
{
const Settings & settings = context->getSettingsRef();
MergingSortedStep merging_sorted(
DataStream{.header = pipeline.getHeader()},
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
sort_description,
settings.max_block_size, limit);
merging_sorted.setStepDescription("Merge sorted streams " + description);
merging_sorted.transformPipeline(pipeline);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
}
void InterpreterSelectQuery::executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const ExpressionActionsPtr & expression)
{
ExpressionStep projection_step(DataStream{.header = pipeline.getHeader()}, expression);
projection_step.setStepDescription("Projection");
projection_step.transformPipeline(pipeline);
auto projection_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
projection_step->setStepDescription("Projection");
query_plan.addStep(std::move(projection_step));
}
void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns, bool pre_distinct)
void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct)
{
auto & query = getSelectQuery();
if (query.distinct)
@ -1642,20 +1639,20 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
DistinctStep distinct_step(
DataStream{.header = pipeline.getHeader()},
auto distinct_step = std::make_unique<DistinctStep>(
query_plan.getCurrentDataStream(),
limits, limit_for_distinct, columns, pre_distinct);
if (pre_distinct)
distinct_step.setStepDescription("Preliminary DISTINCT");
distinct_step->setStepDescription("Preliminary DISTINCT");
distinct_step.transformPipeline(pipeline);
query_plan.addStep(std::move(distinct_step));
}
}
/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset)
void InterpreterSelectQuery::executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset)
{
auto & query = getSelectQuery();
/// If there is LIMIT
@ -1669,14 +1666,14 @@ void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_n
limit_offset = 0;
}
LimitStep limit(DataStream{.header = pipeline.getHeader()}, limit_length, limit_offset);
limit.setStepDescription("preliminary LIMIT");
limit.transformPipeline(pipeline);
auto limit = std::make_unique<LimitStep>(query_plan.getCurrentDataStream(), limit_length, limit_offset);
limit->setStepDescription("preliminary LIMIT");
query_plan.addStep(std::move(limit));
}
}
void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeLimitBy(QueryPlan & query_plan)
{
auto & query = getSelectQuery();
if (!query.limitByLength() || !query.limitBy())
@ -1689,8 +1686,8 @@ void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline)
UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT");
UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0);
LimitByStep limit_by(DataStream{.header = pipeline.getHeader()}, length, offset, columns);
limit_by.transformPipeline(pipeline);
auto limit_by = std::make_unique<LimitByStep>(query_plan.getCurrentDataStream(), length, offset, columns);
query_plan.addStep(std::move(limit_by));
}
@ -1719,7 +1716,7 @@ namespace
}
}
void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeWithFill(QueryPlan & query_plan)
{
auto & query = getSelectQuery();
if (query.orderBy())
@ -1735,13 +1732,13 @@ void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline)
if (fill_descr.empty())
return;
FillingStep filling_step(DataStream{.header = pipeline.getHeader()}, std::move(fill_descr));
filling_step.transformPipeline(pipeline);
auto filling_step = std::make_unique<FillingStep>(query_plan.getCurrentDataStream(), std::move(fill_descr));
query_plan.addStep(std::move(filling_step));
}
}
void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeLimit(QueryPlan & query_plan)
{
auto & query = getSelectQuery();
/// If there is LIMIT
@ -1776,19 +1773,19 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
order_descr = getSortDescription(query, *context);
}
LimitStep limit(
DataStream{.header = pipeline.getHeader()},
auto limit = std::make_unique<LimitStep>(
query_plan.getCurrentDataStream(),
limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr);
if (query.limit_with_ties)
limit.setStepDescription("LIMIT WITH TIES");
limit->setStepDescription("LIMIT WITH TIES");
limit.transformPipeline(pipeline);
query_plan.addStep(std::move(limit));
}
}
void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeOffset(QueryPlan & query_plan)
{
auto & query = getSelectQuery();
/// If there is not a LIMIT but an offset
@ -1798,35 +1795,35 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline)
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context);
OffsetsStep offsets_step(DataStream{.header = pipeline.getHeader()}, limit_offset);
offsets_step.transformPipeline(pipeline);
auto offsets_step = std::make_unique<OffsetsStep>(query_plan.getCurrentDataStream(), limit_offset);
query_plan.addStep(std::move(offsets_step));
}
}
void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeExtremes(QueryPlan & query_plan)
{
if (!context->getSettingsRef().extremes)
return;
ExtremesStep extremes_step(DataStream{.header = pipeline.getHeader()});
extremes_step.transformPipeline(pipeline);
auto extremes_step = std::make_unique<ExtremesStep>(query_plan.getCurrentDataStream());
query_plan.addStep(std::move(extremes_step));
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, const SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_order_info)
executeMergeSorted(pipeline, query_info.input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins");
executeMergeSorted(query_plan, query_info.input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins");
const Settings & settings = context->getSettingsRef();
CreatingSetsStep creating_sets(
DataStream{.header = pipeline.getHeader()},
auto creating_sets = std::make_unique<CreatingSetsStep>(
query_plan.getCurrentDataStream(),
subqueries_for_sets,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
*context);
creating_sets.setStepDescription("Create sets for subqueries and joins");
creating_sets.transformPipeline(pipeline);
creating_sets->setStepDescription("Create sets for subqueries and joins");
query_plan.addStep(std::move(creating_sets));
}

View File

@ -77,6 +77,9 @@ public:
/// Execute a query. Get the stream of blocks to read.
BlockIO execute() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }

View File

@ -6,10 +6,9 @@
#include <Columns/getLeastSuperColumn.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
namespace DB
@ -173,47 +172,35 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
}
void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
size_t num_plans = nested_interpreters.size();
std::vector<QueryPlan> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
nested_interpreters[i]->buildQueryPlan(plans[i]);
data_streams[i] = plans[i].getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
BlockIO InterpreterSelectWithUnionQuery::execute()
{
BlockIO res;
QueryPipeline & main_pipeline = res.pipeline;
std::vector<QueryPipeline> pipelines;
bool has_main_pipeline = false;
Blocks headers;
headers.reserve(nested_interpreters.size());
QueryPlan query_plan;
buildQueryPlan(query_plan);
for (auto & interpreter : nested_interpreters)
{
if (!has_main_pipeline)
{
has_main_pipeline = true;
main_pipeline = interpreter->execute().pipeline;
headers.emplace_back(main_pipeline.getHeader());
}
else
{
pipelines.emplace_back(interpreter->execute().pipeline);
headers.emplace_back(pipelines.back().getHeader());
}
}
auto pipeline = query_plan.buildQueryPipeline();
if (!has_main_pipeline)
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
if (!pipelines.empty())
{
auto common_header = getCommonHeaderForUnion(headers);
main_pipeline.unitePipelines(std::move(pipelines), common_header);
// nested queries can force 1 thread (due to simplicity)
// but in case of union this cannot be done.
UInt64 max_threads = context->getSettingsRef().max_threads;
main_pipeline.setMaxThreads(std::min<UInt64>(nested_interpreters.size(), max_threads));
}
main_pipeline.addInterpreterContext(context);
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);
return res;
}

View File

@ -5,14 +5,12 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;
class QueryPlan;
/** Interprets one or multiple SELECT queries inside UNION ALL chain.
*/
@ -27,6 +25,9 @@ public:
~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
BlockIO execute() override;
bool ignoreLimits() const override { return options.ignore_limits; }

View File

@ -563,7 +563,7 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
}
void QueryPipeline::unitePipelines(
std::vector<QueryPipeline> && pipelines, const Block & common_header)
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header)
{
if (initialized())
{
@ -583,8 +583,9 @@ void QueryPipeline::unitePipelines(
if (totals_having_port)
totals.push_back(totals_having_port);
for (auto & pipeline : pipelines)
for (auto & pipeline_ptr : pipelines)
{
auto & pipeline = *pipeline_ptr;
pipeline.checkInitialized();
if (!pipeline.isCompleted())

View File

@ -135,7 +135,7 @@ public:
void enableQuotaForCurrentStreams();
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header);
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header);
PipelineExecutorPtr execute();

View File

@ -0,0 +1,48 @@
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits{
.preserves_distinct_columns = true
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_)
: ITransformingStep(
input_stream_,
result_header_,
getTraits())
, result_header(std::move(result_header_))
{
/// Some columns may be removed
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void ConvertingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, result_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ConvertingStep : public ITransformingStep
{
public:
ConvertingStep(const DataStream & input_stream_, Block result_header_);
String getName() const override { return "Converting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
Block result_header;
};
}

View File

@ -40,7 +40,7 @@ void MergeSortingStep::transformPipeline(QueryPipeline & pipeline)
return std::make_shared<MergeSortingTransform>(
header, description, max_merged_block_size, limit,
max_bytes_before_remerge,
max_bytes_before_remerge / pipeline.getNumStreams(),
max_bytes_before_external_sort,
tmp_volume,
min_free_disk_space);

View File

@ -35,6 +35,43 @@ const DataStream & QueryPlan::getCurrentDataStream() const
return root->step->getOutputStream();
}
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans)
{
if (isInitialized())
throw Exception("Cannot unite plans because current QueryPlan is already initialized",
ErrorCodes::LOGICAL_ERROR);
const auto & inputs = step->getInputStreams();
size_t num_inputs = step->getInputStreams().size();
if (num_inputs != plans.size())
{
throw Exception("Cannot unite QueryPlans using " + step->getName() +
" because step has different number of inputs. "
"Has " + std::to_string(plans.size()) + " plans "
"and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR);
}
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & step_header = inputs[i].header;
const auto & plan_header = plans[i].getCurrentDataStream().header;
if (!blocksHaveEqualStructure(step_header, plan_header))
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
"it has incompatible header with plan " + root->step->getName() + " "
"plan header: " + plan_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & plan : plans)
nodes.insert(nodes.end(), plan.nodes.begin(), plan.nodes.end());
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
for (auto & plan : plans)
root->children.emplace_back(plan.root);
}
void QueryPlan::addStep(QueryPlanStepPtr step)
{
checkNotCompleted();
@ -48,6 +85,7 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
"step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR);
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
return;
}
@ -100,7 +138,12 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline()
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
if (limit_max_threads)
last_pipeline->setMaxThreads(max_threads);
stack.pop();
}
else

View File

@ -18,6 +18,7 @@ using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
class QueryPlan
{
public:
void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans);
void addStep(QueryPlanStepPtr step);
bool isInitialized() const { return root != nullptr; } /// Tree is not empty
@ -26,6 +27,10 @@ public:
QueryPipelinePtr buildQueryPipeline();
/// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines.
/// TODO: make it in a better way.
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
private:
struct Node
{
@ -40,6 +45,8 @@ private:
void checkInitialized() const;
void checkNotCompleted() const;
size_t max_threads = 0;
};
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{

View File

@ -0,0 +1,39 @@
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sources/NullSource.h>
#include <Interpreters/Context.h>
namespace DB
{
UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_)
: header(std::move(result_header))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
/// TODO: update traits
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines)
{
auto pipeline = std::make_unique<QueryPipeline>();
if (pipelines.empty())
{
pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
return pipeline;
}
size_t num_pipelines = pipelines.size();
pipeline->unitePipelines(std::move(pipelines), output_stream->header);
if (num_pipelines > 1)
{
// nested queries can force 1 thread (due to simplicity)
// but in case of union this cannot be done.
pipeline->setMaxThreads(std::min<UInt64>(num_pipelines, max_threads));
}
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class UnionStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_);
String getName() const override { return "Union"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
private:
Block header;
size_t max_threads;
};
}

View File

@ -139,6 +139,7 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ConvertingStep.cpp
QueryPlan/CreatingSetsStep.cpp
QueryPlan/CubeStep.cpp
QueryPlan/DistinctStep.cpp
@ -157,6 +158,7 @@ SRCS(
QueryPlan/MergingSortedStep.cpp
QueryPlan/OffsetsStep.cpp
QueryPlan/PartialSortingStep.cpp
QueryPlan/UnionStep.cpp
QueryPlan/ReadFromPreparedSource.cpp
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp