Another try.

This commit is contained in:
Nikolai Kochetov 2022-07-14 18:36:50 +00:00
parent dc20b85078
commit 3c9d4c537a
2 changed files with 43 additions and 36 deletions

View File

@ -1253,7 +1253,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeJoin(
}
ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns)
ExpressionActionsChain & chain, bool only_types)
{
const auto * select_query = getSelectQuery();
if (!select_query->prewhere())
@ -1290,14 +1290,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
NameSet required_source_columns(required_columns.begin(), required_columns.end());
required_source_columns.insert(first_action_names.begin(), first_action_names.end());
/// Add required columns to required output in order not to remove them after prewhere execution.
/// TODO: add sampling and final execution to common chain.
for (const auto & column : additional_required_columns)
{
if (required_source_columns.contains(column))
step.addRequiredOutput(column);
}
auto names = step.actions()->getNames();
NameSet name_set(names.begin(), names.end());
@ -1844,12 +1836,28 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
const Settings & settings = context->getSettingsRef();
const ConstStoragePtr & storage = query_analyzer.storage();
Names additional_required_columns_after_prewhere;
ssize_t prewhere_step_num = -1;
ssize_t where_step_num = -1;
ssize_t having_step_num = -1;
auto finalize_chain = [&](ExpressionActionsChain & chain)
{
if (prewhere_step_num >= 0)
{
ExpressionActionsChain::Step & step = *chain.steps.at(prewhere_step_num);
auto required_columns = prewhere_info->prewhere_actions->getRequiredColumnsNames();
NameSet required_source_columns(required_columns.begin(), required_columns.end());
/// Add required columns to required output in order not to remove them after prewhere execution.
/// TODO: add sampling and final execution to common chain.
for (const auto & column : additional_required_columns_after_prewhere)
{
if (required_source_columns.contains(column))
step.addRequiredOutput(column);
}
}
chain.finalize();
finalize(chain, prewhere_step_num, where_step_num, having_step_num, query);
@ -1858,25 +1866,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
};
{
bool join_allow_read_in_order = true;
if (hasJoin())
{
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
join_has_delayed_stream = query_analyzer.analyzedJoin().needStreamWithNonJoinedRows();
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !join_has_delayed_stream;
}
optimize_read_in_order =
settings.optimize_read_in_order
&& storage
&& query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query_analyzer.hasWindow()
&& !query.final()
&& join_allow_read_in_order;
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;
if (storage && (query.sampleSize() || settings.parallel_replicas_count > 1))
{
@ -1892,20 +1882,13 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
columns_for_final.begin(), columns_for_final.end());
}
if (storage && optimize_read_in_order)
{
Names columns_for_sorting_key = metadata_snapshot->getColumnsRequiredForSortingKey();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sorting_key.begin(), columns_for_sorting_key.end());
}
if (storage && filter_info_)
{
filter_info = filter_info_;
filter_info->do_remove_column = true;
}
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage))
{
/// Prewhere is always the first one.
prewhere_step_num = 0;
@ -1983,6 +1966,30 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
}
}
bool join_allow_read_in_order = true;
if (hasJoin())
{
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
join_has_delayed_stream = query_analyzer.analyzedJoin().needStreamWithNonJoinedRows();
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !join_has_delayed_stream;
}
optimize_read_in_order =
settings.optimize_read_in_order
&& storage
&& query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query_analyzer.hasWindow()
&& !query.final()
&& join_allow_read_in_order;
if (storage && optimize_read_in_order)
{
Names columns_for_sorting_key = metadata_snapshot->getColumnsRequiredForSortingKey();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sorting_key.begin(), columns_for_sorting_key.end());
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -403,7 +403,7 @@ private:
/// remove_filter is set in ExpressionActionsChain::finalize();
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
ActionsDAGPtr appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
ActionsDAGPtr appendPrewhere(ExpressionActionsChain & chain, bool only_types);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types, bool optimize_aggregation_in_order, ManyExpressionActions &);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);