Added optimize_read_in_window_order setting.

This commit is contained in:
Vladimir Chebotarev 2022-04-20 16:33:31 +03:00
parent ec22f6d539
commit 7f9557f8a3
8 changed files with 55 additions and 93 deletions

View File

@ -615,16 +615,6 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_makeset_for_
actions = visitor_data.getActions();
}
const NamesAndTypesList & ExpressionAnalyzer::getColumnsAfterWindow() const
{
return columns_after_window;
}
const NamesAndTypesList & ExpressionAnalyzer::getColumnsAfterJoin() const
{
return columns_after_join;
}
void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, ActionsDAGPtr & actions, bool only_consts)
{
LogAST log;
@ -705,7 +695,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
}
}
void makeWindowDescriptionFromAST(const Context & context,
void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
const WindowDescriptions & existing_descriptions,
WindowDescription & desc, const IAST * ast)
{
@ -774,6 +764,10 @@ void makeWindowDescriptionFromAST(const Context & context,
desc.partition_by.push_back(SortColumnDescription(
with_alias->getColumnName(), 1 /* direction */,
1 /* nulls_direction */));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.partition_by_actions.push_back(std::move(actions_dag));
}
}
@ -791,6 +785,10 @@ void makeWindowDescriptionFromAST(const Context & context,
order_by_element.children.front()->getColumnName(),
order_by_element.direction,
order_by_element.nulls_direction));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.partition_by_actions.push_back(std::move(actions_dag));
}
}
@ -817,14 +815,14 @@ void makeWindowDescriptionFromAST(const Context & context,
if (definition.frame_end_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_end_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.end_offset = value;
}
if (definition.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_begin_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.begin_offset = value;
}
}

View File

@ -140,6 +140,7 @@ public:
/// A list of windows for window functions.
const WindowDescriptions & windowDescriptions() const { return window_descriptions; }
void makeWindowDescriptionFromAST(const Context & context, const WindowDescriptions & existing_descriptions, WindowDescription & desc, const IAST * ast);
void makeWindowDescriptions(ActionsDAGPtr actions);
/**
@ -181,12 +182,8 @@ protected:
ArrayJoinActionPtr addMultipleArrayJoinAction(ActionsDAGPtr & actions, bool is_left) const;
public:
void getRootActions(const ASTPtr & ast, bool no_makeset_for_subqueries, ActionsDAGPtr & actions, bool only_consts = false);
const NamesAndTypesList & getColumnsAfterWindow() const;
const NamesAndTypesList & getColumnsAfterJoin() const;
protected:
/** Similar to getRootActions but do not make sets when analyzing IN functions. It's used in
* analyzeAggregation which happens earlier than analyzing PREWHERE and WHERE. If we did, the
* prepared sets would not be applicable for MergeTree index optimization.

View File

@ -7,6 +7,7 @@
#include <DataTypes/IDataType.h>
#include <Core/Names.h>
#include <Core/Types.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB
{
@ -90,6 +91,9 @@ struct WindowDescription
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
std::vector<ActionsDAGPtr> partition_by_actions;
std::vector<ActionsDAGPtr> order_by_actions;
WindowFrame frame;
// The window functions that are calculated for this window.

View File

@ -48,8 +48,9 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// May split ExpressionStep and lift up only a part of it.
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// FIXME
size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Utilize storage sorting when sorting for window functions.
/// Update information about prefix sort description in SortingStep.
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
@ -60,7 +61,7 @@ inline const auto & getOptimizations()
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrdering, "reuseStorageOrdering", &QueryPlanOptimizationSettings::optimize_plan}
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
}};
return optimizations;

View File

@ -33,9 +33,11 @@ namespace DB::ErrorCodes
namespace DB::QueryPlanOptimizations
{
size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
{
auto log = &Poco::Logger::get("tryReuseStorageOrdering()");
/// Find the following sequence of steps, add InputOrderInfo and apply prefix sort description to
/// SortingStep:
/// WindowStep <- SortingStep <- [Expression] <- [SettingQuotaAndLimits] <- ReadFromMergeTree
auto * window_node = parent_node;
auto * window = typeid_cast<WindowStep *>(window_node->step.get());
@ -77,8 +79,6 @@ size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes &
return 0;
}
/// Window <- Sorting <- [Expression] <- [SettingQuotaAndLimits] <- ReadFromMergeTree
auto context = read_from_merge_tree->getContext();
if (!context->getSettings().optimize_read_in_window_order)
{
@ -90,71 +90,39 @@ size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes &
auto analyzer = std::make_unique<ExpressionAnalyzer>(query_info.query, query_info.syntax_analyzer_result, context);
if (select_query->window()) // FIXME
ManyExpressionActions order_by_elements_actions;
auto & window_desc = window->getWindowDescription();
for (const auto & actions_dag : window_desc.partition_by_actions)
{
LOG_DEBUG(log, "Query has window");
ManyExpressionActions order_by_elements_actions;
auto process_children = [&](ASTPtr node)
{
for (const auto & column_ast : node->children)
{
LOG_DEBUG(log, "After join: {}", analyzer->getColumnsAfterJoin().toString());
LOG_DEBUG(log, "After window: {}", analyzer->getColumnsAfterWindow().toString());
auto actions_dag = std::make_shared<ActionsDAG>(analyzer->getColumnsAfterJoin());
analyzer->getRootActions(column_ast, false, actions_dag);
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
};
for (const auto & ptr : select_query->window()->children)
{
const auto & elem = ptr->as<const ASTWindowListElement &>();
auto ast = elem.definition.get();
const auto & definition = ast->as<const ASTWindowDefinition &>();
if (definition.partition_by)
{
process_children(definition.partition_by);
}
if (definition.order_by)
{
process_children(definition.order_by);
}
}
auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
*select_query,
order_by_elements_actions,
//InterpreterSelectQuery::getSortDescription(*select_query, context),
window->getSortDescription(),
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
LOG_DEBUG(log, "Order optimizer is set");
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
auto order_info = order_optimizer->getInputOrder(
query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(),
context,
limit);
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
/// FIXME Window+Sorting may repeat few times.
sorting->convertToFinishSorting(order_info->order_key_prefix_descr);
LOG_DEBUG(log, "Input order info is set");
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
else
for (const auto & actions_dag : window_desc.order_by_actions)
{
LOG_DEBUG(log, "Query has no window");
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
*select_query,
order_by_elements_actions,
window->getWindowDescription().full_sort_description,
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
auto order_info = order_optimizer->getInputOrder(
query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(),
context,
limit);
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
sorting->convertToFinishSorting(order_info->order_key_prefix_descr);
return 0;
}

View File

@ -1072,8 +1072,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
if (select.final())
{
LOG_DEBUG(log, "Select is final");
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
@ -1093,8 +1091,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
}
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order || settings.optimize_read_in_window_order) && input_order_info)
{
LOG_DEBUG(log, "Input order info is present");
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),
column_names_to_read,
@ -1103,8 +1099,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
}
else
{
LOG_DEBUG(log, "Something went wrong");
pipe = spreadMarkRangesAmongStreams(
std::move(result.parts_with_ranges),
column_names_to_read);

View File

@ -138,9 +138,9 @@ void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Functions", std::move(functions_array));
}
const SortDescription & WindowStep::getSortDescription() const
const WindowDescription & WindowStep::getWindowDescription() const
{
return window_description.full_sort_description;
return window_description;
}
}

View File

@ -25,7 +25,7 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const SortDescription & getSortDescription() const;
const WindowDescription & getWindowDescription() const;
private:
WindowDescription window_description;