removed extra structs

This commit is contained in:
Dmitry 2020-02-16 22:46:45 +03:00
parent 2fe011a1b3
commit 6cee50ab97
7 changed files with 42 additions and 96 deletions

View File

@ -693,7 +693,8 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
return true; return true;
} }
bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only_types) bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only_types, bool optimize_aggregation_in_order,
ManyExpressionActions & group_by_elements_actions)
{ {
const auto * select_query = getAggregatingQuery(); const auto * select_query = getAggregatingQuery();
@ -710,6 +711,16 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
getRootActions(asts[i], only_types, step.actions); getRootActions(asts[i], only_types, step.actions);
} }
if (optimize_aggregation_in_order)
{
auto all_columns = sourceWithJoinedColumns();
for (auto & child : asts)
{
group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, group_by_elements_actions.back());
}
}
return true; return true;
} }
@ -1051,7 +1062,12 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (need_aggregate) if (need_aggregate)
{ {
query_analyzer.appendGroupBy(chain, only_types || !first_stage); /// TODO correct conditions
optimize_aggregation_in_order =
context.getSettingsRef().optimize_aggregation_in_order
&& storage && query.groupBy();
query_analyzer.appendGroupBy(chain, only_types || !first_stage, optimize_aggregation_in_order, group_by_elements_actions);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage); query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
before_aggregation = chain.getLastActions(); before_aggregation = chain.getLastActions();
@ -1064,13 +1080,13 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
} }
} }
bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows()); bool has_stream_with_non_joined_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
optimize_read_in_order = optimize_read_in_order =
settings.optimize_read_in_order settings.optimize_read_in_order
&& storage && query.orderBy() && storage && query.orderBy()
&& !query_analyzer.hasAggregation() && !query_analyzer.hasAggregation()
&& !query.final() && !query.final()
&& !has_stream_with_non_joned_rows; && !has_stream_with_non_joined_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage)); query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -173,6 +173,7 @@ struct ExpressionAnalysisResult
bool remove_where_filter = false; bool remove_where_filter = false;
bool optimize_read_in_order = false; bool optimize_read_in_order = false;
bool optimize_aggregation_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where; ExpressionActionsPtr before_where;
@ -194,6 +195,7 @@ struct ExpressionAnalysisResult
ConstantFilterDescription where_constant_filter_description; ConstantFilterDescription where_constant_filter_description;
/// Actions by every element of ORDER BY /// Actions by every element of ORDER BY
ManyExpressionActions order_by_elements_actions; ManyExpressionActions order_by_elements_actions;
ManyExpressionActions group_by_elements_actions;
ExpressionAnalysisResult() = default; ExpressionAnalysisResult() = default;
@ -301,7 +303,7 @@ private:
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier). /// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns); bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
bool appendWhere(ExpressionActionsChain & chain, bool only_types); bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types); bool appendGroupBy(ExpressionActionsChain & chain, bool only_types, bool optimize_aggregation_in_order, ManyExpressionActions &);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types); void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
/// After aggregation: /// After aggregation:

View File

@ -679,17 +679,18 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co
return order_descr; return order_descr;
} }
static Names getGroupByDescription(const ASTSelectQuery & query, const Context & /*context*/) static SortDescription getSortDescriptionFromGroupBy(const ASTSelectQuery & query, const Context & /*context*/)
{ {
Names group_by_descr; SortDescription order_descr;
group_by_descr.reserve(query.groupBy()->children.size()); order_descr.reserve(query.groupBy()->children.size());
for (const auto & elem : query.groupBy()->children) for (const auto & elem : query.groupBy()->children)
{ {
String name = elem->getColumnName(); String name = elem->getColumnName();
group_by_descr.push_back(name); order_descr.emplace_back(name, 1, 1);
} }
return group_by_descr;
return order_descr;
} }
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context) static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
@ -1413,11 +1414,12 @@ void InterpreterSelectQuery::executeFetchColumns(
if (analysis_result.optimize_aggregation_in_order) if (analysis_result.optimize_aggregation_in_order)
{ {
query_info.group_by_optimizer = std::make_shared<AggregateInOrderOptimizer>( query_info.group_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
getGroupByDescription(query, *context), analysis_result.group_by_elements_actions,
query_info.syntax_analyzer_result); getSortDescriptionFromGroupBy(query, *context),
query_info.syntax_analyzer_result);
query_info.group_by_info = query_info.group_by_optimizer->getGroupByCommonPrefix(storage); query_info.group_by_info = query_info.group_by_optimizer->getInputOrder(storage);
} }
@ -1633,7 +1635,7 @@ void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const Expres
}); });
} }
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info) void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr group_by_info)
{ {
pipeline.transform([&](auto & stream) pipeline.transform([&](auto & stream)
{ {
@ -1656,9 +1658,9 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
if (group_by_info) if (group_by_info)
{ {
/// TODO optimization :) /// TODO optimization :)
// std::cerr << "\n";
// for (const auto & elem : group_by_info->order_key_prefix_descr) // for (const auto & elem : group_by_info->order_key_prefix_descr)
// std::cerr << elem << " "; // std::cerr << elem.column_name << " ";
// std::cerr << "\n"; // std::cerr << "\n";
} }
@ -1706,7 +1708,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
} }
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr /*group_by_info*/) void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr /*group_by_info*/)
{ {
pipeline.addSimpleTransform([&](const Block & header) pipeline.addSimpleTransform([&](const Block & header)
{ {

View File

@ -168,7 +168,7 @@ private:
QueryPipeline & save_context_and_storage); QueryPipeline & save_context_and_storage);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info); void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr sorting_info);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
@ -187,7 +187,7 @@ private:
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit); void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter); void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info); void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr sorting_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final); void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);

View File

@ -110,45 +110,4 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction); return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
} }
AggregateInOrderOptimizer::AggregateInOrderOptimizer(
const Names & group_by_description_,
const SyntaxAnalyzerResultPtr & syntax_result)
: group_by_description(group_by_description_)
{
/// Not sure yet but let it be
for (const auto & elem : syntax_result->array_join_result_to_source)
forbidden_columns.insert(elem.first);
}
GroupByInfoPtr AggregateInOrderOptimizer::getGroupByCommonPrefix(const StoragePtr &storage) const
{
const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get());
if (!merge_tree || !merge_tree->hasSortingKey())
return {};
Names group_by_common_prefix;
const auto & sorting_key_columns = merge_tree->getSortingKeyColumns();
size_t prefix_size = std::min(group_by_description.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(group_by_description[i]))
break;
if (group_by_description[i] == sorting_key_columns[i])
group_by_common_prefix.push_back(group_by_description[i]);
else
{
/// TODO injective functions
break;
}
}
if (group_by_common_prefix.empty())
return {};
return std::make_shared<GroupByInfo>(std::move(group_by_common_prefix));
}
} }

View File

@ -29,25 +29,4 @@ private:
SortDescription required_sort_description; SortDescription required_sort_description;
}; };
/** Helper class, that can analyze MergeTree order key
* and required group by description to get their
* common prefix, which is needed for
* performing reading in order of PK.
*/
class AggregateInOrderOptimizer
{
public:
AggregateInOrderOptimizer(
const Names & group_by_description,
const SyntaxAnalyzerResultPtr & syntax_result);
GroupByInfoPtr getGroupByCommonPrefix(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicity
NameSet forbidden_columns;
Names group_by_description;
};
} }

View File

@ -52,18 +52,9 @@ struct InputSortingInfo
bool operator !=(const InputSortingInfo & other) const { return !(*this == other); } bool operator !=(const InputSortingInfo & other) const { return !(*this == other); }
}; };
struct GroupByInfo
{
Names order_key_prefix_descr;
GroupByInfo(const Names & order_key_prefix_descr_)
: order_key_prefix_descr(order_key_prefix_descr_) {}
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>; using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>; using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>; using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>;
using GroupByInfoPtr = std::shared_ptr<GroupByInfo>;
struct SyntaxAnalyzerResult; struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
@ -71,8 +62,6 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class ReadInOrderOptimizer; class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>; using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class AggregateInOrderOptimizer;
using AggregateInOrderOptimizerPtr = std::shared_ptr<const AggregateInOrderOptimizer>;
/** Query along with some additional data, /** Query along with some additional data,
* that can be used during query processing * that can be used during query processing
@ -87,13 +76,12 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info; PrewhereInfoPtr prewhere_info;
ReadInOrderOptimizerPtr order_by_optimizer; ReadInOrderOptimizerPtr order_by_optimizer;
ReadInOrderOptimizerPtr group_by_optimizer;
AggregateInOrderOptimizerPtr group_by_optimizer;
/// We can modify it while reading from storage /// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info; mutable InputSortingInfoPtr input_sorting_info;
mutable InputSortingInfoPtr group_by_info;
GroupByInfoPtr group_by_info;
/// Prepared sets are used for indices by storage engine. /// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3) /// Example: x IN (1, 2, 3)
PreparedSets sets; PreparedSets sets;