actualize ExpressionAnalyzer public interface

This commit is contained in:
chertus 2020-02-10 22:55:13 +03:00
parent bd6d9a427b
commit b4ea4ca654
4 changed files with 63 additions and 105 deletions

View File

@ -79,9 +79,9 @@ namespace
/// Check if there is an ignore function. It's used for disabling constant folding in query
/// predicates because some performance tests use ignore function as a non-optimize guard.
bool allowEarlyConstantFolding(const ExpressionActions & actions, const Context & context)
bool allowEarlyConstantFolding(const ExpressionActions & actions, const Settings & settings)
{
if (!context.getSettingsRef().enable_early_constant_folding)
if (!settings.enable_early_constant_folding)
return false;
for (auto & action : actions.getActions())
@ -775,7 +775,8 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain,
step.required_output.push_back(child->getColumnName());
}
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order)
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order,
ManyExpressionActions & order_by_elements_actions)
{
const auto * select_query = getSelectQuery();
@ -933,25 +934,16 @@ ExpressionActionsPtr SelectQueryExpressionAnalyzer::simpleSelectActions()
return new_chain.getLastActions();
}
void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const
{
for (const auto & name_and_type : aggregation_keys)
key_names.emplace_back(name_and_type.name);
aggregates = aggregate_descriptions;
}
ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
ExpressionAnalysisResult::ExpressionAnalysisResult(
SelectQueryExpressionAnalyzer & query_analyzer,
bool first_stage_,
bool second_stage_,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info_,
const Block & source_header)
: first_stage(first_stage_)
, second_stage(second_stage_)
, need_aggregate(query_analyzer.hasAggregation())
{
/// first_stage: Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
/// second_stage: Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
@ -961,6 +953,11 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
const ASTSelectQuery & query = *query_analyzer.getSelectQuery();
const Context & context = query_analyzer.context;
const Settings & settings = context.getSettingsRef();
const StoragePtr & storage = query_analyzer.storage();
bool finalized = false;
size_t where_step_num = 0;
@ -979,7 +976,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;
if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1))
if (storage && (query.sample_size() || settings.parallel_replicas_count > 1))
{
Names columns_for_sampling = storage->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
@ -1004,7 +1001,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, context))
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{
Block before_prewhere_sample = source_header;
if (sanitizeBlock(before_prewhere_sample))
@ -1019,8 +1016,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
chain.addStep();
}
need_aggregate = query_analyzer.hasAggregation();
query_analyzer.appendArrayJoin(chain, only_types || !first_stage);
if (query_analyzer.appendJoin(chain, only_types || !first_stage))
@ -1035,7 +1030,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
{
where_step_num = chain.steps.size() - 1;
before_where = chain.getLastActions();
if (allowEarlyConstantFolding(*before_where, context))
if (allowEarlyConstantFolding(*before_where, settings))
{
Block before_where_sample;
if (chain.steps.size() > 1)
@ -1071,7 +1066,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
optimize_read_in_order =
context.getSettingsRef().optimize_read_in_order
settings.optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
@ -1080,7 +1075,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
selected_columns = chain.getLastStep().required_output;
has_order_by = query_analyzer.appendOrderBy(chain, only_types || (need_aggregate ? !second_stage : !first_stage), optimize_read_in_order);
has_order_by = query_analyzer.appendOrderBy(chain, only_types || (need_aggregate ? !second_stage : !first_stage),
optimize_read_in_order, order_by_elements_actions);
before_order_and_select = chain.getLastActions();
chain.addStep();
@ -1099,8 +1095,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
removeExtraColumns();
subqueries_for_sets = query_analyzer.getSubqueriesForSets();
checkActions();
}

View File

@ -52,9 +52,6 @@ struct ExpressionAnalyzerData
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables;
/// Actions by every element of ORDER BY
ManyExpressionActions order_by_elements_actions;
};
@ -166,6 +163,11 @@ class SelectQueryExpressionAnalyzer;
/// Result of SelectQueryExpressionAnalyzer: expressions for InterpreterSelectQuery
struct ExpressionAnalysisResult
{
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
bool need_aggregate = false;
bool has_order_by = false;
@ -186,26 +188,19 @@ struct ExpressionAnalysisResult
/// Columns will be removed after prewhere actions execution.
Names columns_to_remove_after_prewhere;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
FilterInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;
/// Actions by every element of ORDER BY
ManyExpressionActions order_by_elements_actions;
ExpressionAnalysisResult() = default;
ExpressionAnalysisResult(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
bool first_stage,
bool second_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info,
const Block & source_header);
@ -243,13 +238,13 @@ public:
bool hasAggregation() const { return has_aggregation; }
bool hasGlobalSubqueries() { return has_global_subqueries; }
/// Get a list of aggregation keys and descriptions of aggregate functions if the query contains GROUP BY.
void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const;
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);
const PreparedSets & getPreparedSets() const { return prepared_sets; }
const ManyExpressionActions & getOrderByActions() const { return order_by_elements_actions; }
/// Tables that will need to be sent to remote servers for distributed query processing.
const Tables & getExternalTables() const { return external_tables; }
@ -260,9 +255,6 @@ public:
/// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases.
void appendProjectResult(ExpressionActionsChain & chain) const;
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex(const ASTPtr & node);
private:
/// If non-empty, ignore all expressions not from this list.
NameSet required_result_columns;
@ -315,7 +307,7 @@ private:
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
/// appendSelect
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order);
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order, ManyExpressionActions &);
bool appendLimitBy(ExpressionActionsChain & chain, bool only_types);
/// appendProjectResult
};

View File

@ -546,12 +546,9 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
&& options.to_stage > QueryProcessingStage::WithMergeableState;
analysis_result = ExpressionAnalysisResult(
getSelectQuery(),
*query_analyzer,
first_stage,
second_stage,
*context,
storage,
options.only_analyze,
filter_info,
source_header
@ -578,16 +575,12 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
auto header = analysis_result.before_aggregation->getSampleBlock();
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block res;
for (auto & key : key_names)
res.insert({nullptr, header.getByName(key).type, key});
for (auto & key : query_analyzer->aggregationKeys())
res.insert({nullptr, header.getByName(key.name).type, key.name});
for (auto & aggregate : aggregates)
for (auto & aggregate : query_analyzer->aggregates())
{
size_t arguments_size = aggregate.argument_names.size();
DataTypes argument_types(arguments_size);
@ -748,6 +741,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
auto & expressions = analysis_result;
auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (options.only_analyze)
{
@ -953,8 +947,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
// If there is no global subqueries, we can run subqueries only when receive them on server.
if (!query_analyzer->hasGlobalSubqueries() && !expressions.subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
if (!query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
}
if (expressions.second_stage)
@ -1061,8 +1055,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
}
if (query_analyzer->hasGlobalSubqueries() && !expressions.subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
if (query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
}
template <typename TPipeline>
@ -1084,9 +1078,7 @@ void InterpreterSelectQuery::executeFetchColumns(
|| !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns)
return {};
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
const AggregateDescriptions & aggregates = query_analyzer->aggregates();
if (aggregates.size() != 1)
return {};
@ -1399,7 +1391,7 @@ void InterpreterSelectQuery::executeFetchColumns(
if (analysis_result.optimize_read_in_order)
{
query_info.order_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
query_analyzer->getOrderByActions(),
analysis_result.order_by_elements_actions,
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
@ -1626,14 +1618,12 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
});
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header.getPositionByName(key.name));
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
@ -1692,14 +1682,12 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
return std::make_shared<ExpressionTransform>(header, expression);
});
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header_before_aggregation = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header_before_aggregation.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_aggregation.getPositionByName(key.name));
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
@ -1760,15 +1748,11 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final)
{
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header.getPositionByName(key.name));
/** There are two modes of distributed aggregation.
*
@ -1787,7 +1771,7 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool ov
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_threads);
Aggregator::Params params(header, keys, query_analyzer->aggregates(), overflow_row, settings.max_threads);
if (!settings.distributed_aggregation_memory_efficient)
{
@ -1811,15 +1795,11 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool ov
void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final)
{
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header_before_merge = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header_before_merge.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_merge.getPositionByName(key.name));
/** There are two modes of distributed aggregation.
*
@ -1838,7 +1818,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
Aggregator::Params params(header_before_merge, keys, query_analyzer->aggregates(), overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -1927,20 +1907,16 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
{
executeUnion(pipeline, {});
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header.getPositionByName(key.name));
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
Aggregator::Params params(header, keys, query_analyzer->aggregates(),
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
@ -1956,20 +1932,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
{
pipeline.resize(1);
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header_before_transform = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header_before_transform.getPositionByName(name));
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_transform.getPositionByName(key.name));
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header_before_transform, keys, aggregates,
Aggregator::Params params(header_before_transform, keys, query_analyzer->aggregates(),
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
@ -2566,7 +2538,7 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
{
/// Merge streams to one. Use MergeSorting if data was read in sorted order, Union otherwise.
if (query_info.input_sorting_info)
@ -2582,7 +2554,7 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline
pipeline.firstStream(), subqueries_for_sets, *context);
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_sorting_info)
executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);

View File

@ -183,7 +183,7 @@ private:
void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
@ -201,7 +201,7 @@ private:
void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns);
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
String generateFilterActions(ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;