Fix normal projection

This commit is contained in:
Amos Bird 2021-07-07 13:01:30 +08:00
parent 0b032e852d
commit 55981cb0ae
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
8 changed files with 50 additions and 22 deletions

View File

@ -285,6 +285,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
checkStackSize();
query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
initSettings();
const Settings & settings = context->getSettingsRef();
@ -577,9 +578,9 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
/// We must guarantee that result structure is the same as in getSampleBlock()
///
/// But if we ignore aggregation, plan header does not match result_header.
/// But if it's a projection query, plan header does not match result_header.
/// TODO: add special stage for InterpreterSelectQuery?
if (!options.ignore_aggregation && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
if (!options.is_projection_query && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
@ -2009,7 +2010,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
if (options.ignore_aggregation)
if (options.is_projection_query)
return;
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;

View File

@ -32,13 +32,14 @@ struct SelectQueryOptions
bool remove_duplicates = false;
bool ignore_quota = false;
bool ignore_limits = false;
/// This is a temporary flag to avoid adding aggregating step. Used for projections.
/// TODO: we need more stages for InterpreterSelectQuery
bool ignore_aggregation = false;
/// This flag is needed to analyze query ignoring table projections.
/// It is needed because we build another one InterpreterSelectQuery while analyzing projections.
/// It helps to avoid infinite recursion.
bool ignore_projections = false;
/// This flag is also used for projection analysis.
/// It is needed because lazy normal projections require special planning in FetchColumns stage, such as adding WHERE transform.
/// It is also used to avoid adding aggregating step when aggregate projection is chosen.
bool is_projection_query = false;
bool ignore_alias = false;
bool is_internal = false;
bool is_subquery = false; // non-subquery can also have subquery_depth > 0, e.g. insert select
@ -97,9 +98,9 @@ struct SelectQueryOptions
return *this;
}
SelectQueryOptions & ignoreAggregation(bool value = true)
SelectQueryOptions & projectionQuery(bool value = true)
{
ignore_aggregation = value;
is_projection_query = value;
return *this;
}

View File

@ -3902,7 +3902,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections || query_info.is_projection_query)
return false;
const auto & query_ptr = query_info.query;

View File

@ -178,7 +178,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
Pipe projection_pipe;
Pipe ordinary_pipe;
const auto & given_select = query_info.query->as<const ASTSelectQuery &>();
if (!projection_parts.empty())
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
@ -228,22 +227,28 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!normal_parts.empty())
{
auto storage_from_base_parts_of_projection = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto ast = query_info.projection->desc->query_ast->clone();
auto & select = ast->as<ASTSelectQuery &>();
if (given_select.where())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
if (given_select.prewhere())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.prewhere()->clone());
// After overriding the group by clause, we finish the possible aggregations directly
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())
select.setExpression(ASTSelectQuery::Expression::GROUP_BY, given_select.groupBy()->clone());
auto interpreter = InterpreterSelectQuery(
ast,
query_info.query,
context,
storage_from_base_parts_of_projection,
nullptr,
SelectQueryOptions{processed_stage}.ignoreAggregation().ignoreProjections());
SelectQueryOptions{processed_stage}.projectionQuery());
QueryPlan ordinary_query_plan;
interpreter.buildQueryPlan(ordinary_query_plan);
const auto & expressions = interpreter.getAnalysisResult();
if (processed_stage == QueryProcessingStage::Enum::FetchColumns && expressions.before_where)
{
auto where_step = std::make_unique<FilterStep>(
ordinary_query_plan.getCurrentDataStream(),
expressions.before_where,
expressions.where_column_name,
expressions.remove_where_filter);
where_step->setStepDescription("WHERE");
ordinary_query_plan.addStep(std::move(where_step));
}
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
}

View File

@ -47,6 +47,7 @@ public:
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
bool supportsPrewhere() const override { return true; }
bool supportsIndexForIn() const override { return true; }

View File

@ -160,6 +160,7 @@ struct SelectQueryInfo
/// If not null, it means we choose a projection to execute current query.
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
bool is_projection_query = false;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_cache;
};

View File

@ -0,0 +1,2 @@
1
1 1

View File

@ -0,0 +1,17 @@
drop table if exists t;
create table t (i int, j int) engine MergeTree order by i;
insert into t values (1, 2);
alter table t add projection x (select * order by j);
insert into t values (1, 4);
set allow_experimental_projection_optimization = 1, force_optimize_projection = 1;
select i from t prewhere j = 4;
SELECT j = 2, i FROM t PREWHERE j = 2;
drop table t;