Make InterpreterSelectQuery::analyzeExpressions static.

This commit is contained in:
Nikolai Kochetov 2019-08-06 16:00:56 +03:00
parent 9dd9553d73
commit e73ea8a131
2 changed files with 55 additions and 22 deletions

View File

@ -401,16 +401,25 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
} }
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::AnalysisResult
InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info) InterpreterSelectQuery::analyzeExpressions(
const ASTSelectQuery & query,
const NamesAndTypesList & source_columns,
ExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool dry_run,
const FilterInfoPtr & filter_info)
{ {
AnalysisResult res; AnalysisResult res;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& options.to_stage >= QueryProcessingStage::WithMergeableState; && to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& options.to_stage > QueryProcessingStage::WithMergeableState; && to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it. /** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
@ -465,8 +474,6 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
{ {
ExpressionActionsChain chain(context); ExpressionActionsChain chain(context);
auto & query = getSelectQuery();
Names additional_required_columns_after_prewhere; Names additional_required_columns_after_prewhere;
if (storage && query.sample_size()) if (storage && query.sample_size())
@ -503,7 +510,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep(); chain.addStep();
} }
if (query_analyzer->appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere)) if (query_analyzer.appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
{ {
has_prewhere = true; has_prewhere = true;
@ -513,11 +520,11 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep(); chain.addStep();
} }
res.need_aggregate = query_analyzer->hasAggregation(); res.need_aggregate = query_analyzer.hasAggregation();
query_analyzer->appendArrayJoin(chain, dry_run || !res.first_stage); query_analyzer.appendArrayJoin(chain, dry_run || !res.first_stage);
if (query_analyzer->appendJoin(chain, dry_run || !res.first_stage)) if (query_analyzer.appendJoin(chain, dry_run || !res.first_stage))
{ {
res.before_join = chain.getLastActions(); res.before_join = chain.getLastActions();
if (!res.hasJoin()) if (!res.hasJoin())
@ -525,7 +532,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep(); chain.addStep();
} }
if (query_analyzer->appendWhere(chain, dry_run || !res.first_stage)) if (query_analyzer.appendWhere(chain, dry_run || !res.first_stage))
{ {
where_step_num = chain.steps.size() - 1; where_step_num = chain.steps.size() - 1;
has_where = res.has_where = true; has_where = res.has_where = true;
@ -535,13 +542,13 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
if (res.need_aggregate) if (res.need_aggregate)
{ {
query_analyzer->appendGroupBy(chain, dry_run || !res.first_stage); query_analyzer.appendGroupBy(chain, dry_run || !res.first_stage);
query_analyzer->appendAggregateFunctionsArguments(chain, dry_run || !res.first_stage); query_analyzer.appendAggregateFunctionsArguments(chain, dry_run || !res.first_stage);
res.before_aggregation = chain.getLastActions(); res.before_aggregation = chain.getLastActions();
finalizeChain(chain); finalizeChain(chain);
if (query_analyzer->appendHaving(chain, dry_run || !res.second_stage)) if (query_analyzer.appendHaving(chain, dry_run || !res.second_stage))
{ {
res.has_having = true; res.has_having = true;
res.before_having = chain.getLastActions(); res.before_having = chain.getLastActions();
@ -550,20 +557,20 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
} }
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage)); query_analyzer.appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output; res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage)); res.has_order_by = query_analyzer.appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.before_order_and_select = chain.getLastActions(); res.before_order_and_select = chain.getLastActions();
chain.addStep(); chain.addStep();
if (query_analyzer->appendLimitBy(chain, dry_run || !res.second_stage)) if (query_analyzer.appendLimitBy(chain, dry_run || !res.second_stage))
{ {
res.has_limit_by = true; res.has_limit_by = true;
res.before_limit_by = chain.getLastActions(); res.before_limit_by = chain.getLastActions();
chain.addStep(); chain.addStep();
} }
query_analyzer->appendProjectResult(chain); query_analyzer.appendProjectResult(chain);
res.final_projection = chain.getLastActions(); res.final_projection = chain.getLastActions();
finalizeChain(chain); finalizeChain(chain);
@ -577,7 +584,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
if (res.has_having) if (res.has_having)
res.before_having->prependProjectInput(); res.before_having->prependProjectInput();
res.subqueries_for_sets = query_analyzer->getSubqueriesForSets(); res.subqueries_for_sets = query_analyzer.getSubqueriesForSets();
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows. /// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (res.prewhere_info) if (res.prewhere_info)
@ -821,7 +828,16 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
else else
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header)); pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true, filter_info); expressions = analyzeExpressions(
getSelectQuery(),
source_columns,
*query_analyzer,
QueryProcessingStage::FetchColumns,
options.to_stage,
context,
storage,
true,
filter_info);
if (storage && expressions.filter_info && expressions.prewhere_info) if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
@ -853,7 +869,16 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
pipeline.streams.push_back(prepared_input); pipeline.streams.push_back(prepared_input);
} }
expressions = analyzeExpressions(from_stage, false, filter_info); expressions = analyzeExpressions(
getSelectQuery(),
source_columns,
*query_analyzer,
from_stage,
options.to_stage,
context,
storage,
false,
filter_info);
if (from_stage == QueryProcessingStage::WithMergeableState && if (from_stage == QueryProcessingStage::WithMergeableState &&
options.to_stage == QueryProcessingStage::WithMergeableState) options.to_stage == QueryProcessingStage::WithMergeableState)

View File

@ -172,8 +172,16 @@ private:
FilterInfoPtr filter_info; FilterInfoPtr filter_info;
}; };
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info); static AnalysisResult analyzeExpressions(
const ASTSelectQuery & query,
const NamesAndTypesList & source_columns,
ExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool dry_run,
const FilterInfoPtr & filter_info);
/** From which table to read. With JOIN, the "left" table is returned. /** From which table to read. With JOIN, the "left" table is returned.
*/ */