Merge pull request #34632 from excitoon-favorites/optimizedprocessing

Optimized processing of ORDER BY in window functions
This commit is contained in:
Dmitry Novik 2022-06-20 20:03:26 +02:00 committed by GitHub
commit f6692c34e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 279 additions and 12 deletions

View File

@ -406,6 +406,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.", 0) \
M(Bool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(Bool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_read_in_window_order, true, "Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_aggregation_in_order, false, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \
M(UInt64, read_in_order_two_level_merge_threshold, 100, "Minimal number of parts to read to run preliminary merge step during multithread reading in order of primary key.", 0) \

View File

@ -615,7 +615,6 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_makeset_for_
actions = visitor_data.getActions();
}
void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, ActionsDAGPtr & actions, bool only_consts)
{
LogAST log;
@ -718,7 +717,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
}
}
void makeWindowDescriptionFromAST(const Context & context,
void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
const WindowDescriptions & existing_descriptions,
WindowDescription & desc, const IAST * ast)
{
@ -787,6 +786,10 @@ void makeWindowDescriptionFromAST(const Context & context,
desc.partition_by.push_back(SortColumnDescription(
with_alias->getColumnName(), 1 /* direction */,
1 /* nulls_direction */));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.partition_by_actions.push_back(std::move(actions_dag));
}
}
@ -804,6 +807,10 @@ void makeWindowDescriptionFromAST(const Context & context,
order_by_element.children.front()->getColumnName(),
order_by_element.direction,
order_by_element.nulls_direction));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.order_by_actions.push_back(std::move(actions_dag));
}
}
@ -830,14 +837,14 @@ void makeWindowDescriptionFromAST(const Context & context,
if (definition.frame_end_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_end_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.end_offset = value;
}
if (definition.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_begin_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.begin_offset = value;
}
}

View File

@ -140,6 +140,7 @@ public:
/// A list of windows for window functions.
const WindowDescriptions & windowDescriptions() const { return window_descriptions; }
void makeWindowDescriptionFromAST(const Context & context, const WindowDescriptions & existing_descriptions, WindowDescription & desc, const IAST * ast);
void makeWindowDescriptions(ActionsDAGPtr actions);
/**

View File

@ -886,7 +886,7 @@ static FillColumnDescription getWithFillDescription(const ASTOrderByElement & or
return descr;
}
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context)
SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery & query, ContextPtr context_)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
@ -900,15 +900,15 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, ContextP
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context_);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
order_descr.compile_sort_description = context->getSettingsRef().compile_sort_description;
order_descr.min_count_to_compile_sort_description = context->getSettingsRef().min_count_to_compile_sort_description;
order_descr.compile_sort_description = context_->getSettingsRef().compile_sort_description;
order_descr.min_count_to_compile_sort_description = context_->getSettingsRef().min_count_to_compile_sort_description;
return order_descr;
}
@ -1033,12 +1033,12 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
}
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context)
UInt64 InterpreterSelectQuery::getLimitForSorting(const ASTSelectQuery & query, ContextPtr context_)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY, neither ARRAY JOIN.
if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList().first && query.limitLength())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context_);
if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
return 0;

View File

@ -128,6 +128,9 @@ public:
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo(size_t replica_num, size_t replica_count);
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -7,6 +7,7 @@
#include <DataTypes/IDataType.h>
#include <Core/Names.h>
#include <Core/Types.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB
{
@ -90,6 +91,9 @@ struct WindowDescription
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
std::vector<ActionsDAGPtr> partition_by_actions;
std::vector<ActionsDAGPtr> order_by_actions;
WindowFrame frame;
// The window functions that are calculated for this window.

View File

@ -48,15 +48,20 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// May split ExpressionStep and lift up only a part of it.
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Utilize storage sorting when sorting for window functions.
/// Update information about prefix sort description in SortingStep.
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 6> optimizations = {{
static const std::array<Optimization, 7> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
}};
return optimizations;

View File

@ -0,0 +1,113 @@
#include <Parsers/ASTWindowDefinition.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
namespace DB::QueryPlanOptimizations
{
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
{
/// Find the following sequence of steps, add InputOrderInfo and apply prefix sort description to
/// SortingStep:
/// WindowStep <- SortingStep <- [Expression] <- [SettingQuotaAndLimits] <- ReadFromMergeTree
auto * window_node = parent_node;
auto * window = typeid_cast<WindowStep *>(window_node->step.get());
if (!window)
return 0;
if (window_node->children.size() != 1)
return 0;
auto * sorting_node = window_node->children.front();
auto * sorting = typeid_cast<SortingStep *>(sorting_node->step.get());
if (!sorting)
return 0;
if (sorting_node->children.size() != 1)
return 0;
auto * possible_read_from_merge_tree_node = sorting_node->children.front();
if (typeid_cast<ExpressionStep *>(possible_read_from_merge_tree_node->step.get()))
{
if (possible_read_from_merge_tree_node->children.size() != 1)
return 0;
possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
}
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(possible_read_from_merge_tree_node->step.get());
if (!read_from_merge_tree)
{
return 0;
}
auto context = read_from_merge_tree->getContext();
if (!context->getSettings().optimize_read_in_window_order)
{
return 0;
}
const auto & query_info = read_from_merge_tree->getQueryInfo();
const auto * select_query = query_info.query->as<ASTSelectQuery>();
ManyExpressionActions order_by_elements_actions;
const auto & window_desc = window->getWindowDescription();
for (const auto & actions_dag : window_desc.partition_by_actions)
{
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
for (const auto & actions_dag : window_desc.order_by_actions)
{
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
*select_query,
order_by_elements_actions,
window->getWindowDescription().full_sort_description,
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
auto order_info = order_optimizer->getInputOrder(
query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(),
context,
limit);
if (order_info)
{
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
sorting->convertToFinishSorting(order_info->order_key_prefix_descr);
}
return 0;
}
}

View File

@ -982,6 +982,30 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
void ReadFromMergeTree::setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> order_optimizer)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = order_optimizer;
}
else
{
query_info.order_optimizer = order_optimizer;
}
}
void ReadFromMergeTree::setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info)
{
if (query_info.projection)
{
query_info.projection->input_order_info = order_info;
}
else
{
query_info.input_order_info = order_info;
}
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts);
@ -1065,7 +1089,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
column_names_to_read,
result_projection);
}
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order || settings.optimize_read_in_window_order) && input_order_info)
{
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),

View File

@ -129,6 +129,13 @@ public:
bool sample_factor_column_queried,
Poco::Logger * log);
ContextPtr getContext() const { return context; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
void setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> read_in_order_optimizer);
void setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info);
private:
const MergeTreeReaderSettings reader_settings;

View File

@ -112,6 +112,12 @@ void SortingStep::updateLimit(size_t limit_)
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (type == Type::FinishSorting)

View File

@ -54,6 +54,8 @@ public:
SortDescription getSortDescription() const { return result_description; }
void convertToFinishSorting(SortDescription prefix_description);
private:
enum class Type

View File

@ -138,4 +138,9 @@ void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Functions", std::move(functions_array));
}
const WindowDescription & WindowStep::getWindowDescription() const
{
return window_description;
}
}

View File

@ -25,6 +25,8 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const WindowDescription & getWindowDescription() const;
private:
WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;

View File

@ -0,0 +1,12 @@
Partial sorting plan
optimize_read_in_window_order=0
Sort description: n ASC, x ASC
optimize_read_in_window_order=1
Prefix sort description: n ASC
Result sort description: n ASC, x ASC
No sorting plan
optimize_read_in_window_order=0
Sort description: n ASC, x ASC
optimize_read_in_window_order=1
Prefix sort description: n ASC, x ASC
Result sort description: n ASC, x ASC

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
name=test_01655_plan_optimizations_optimize_read_in_window_order
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n_x"
$CLICKHOUSE_CLIENT -q "create table ${name} engine=MergeTree order by tuple() as select toInt64((sin(number)+2)*65535)%10 as n, number as x from numbers_mt(100000)"
$CLICKHOUSE_CLIENT -q "create table ${name}_n engine=MergeTree order by n as select * from ${name} order by n"
$CLICKHOUSE_CLIENT -q "create table ${name}_n_x engine=MergeTree order by (n, x) as select * from ${name} order by n, x"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n final"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n_x final"
echo 'Partial sorting plan'
echo ' optimize_read_in_window_order=0'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=0" | grep -i "sort description"
echo ' optimize_read_in_window_order=1'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=1" | grep -i "sort description"
echo 'No sorting plan'
echo ' optimize_read_in_window_order=0'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0" | grep -i "sort description"
echo ' optimize_read_in_window_order=1'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1" | grep -i "sort description"
$CLICKHOUSE_CLIENT -q "drop table ${name}"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n_x"

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
name=test_01655_plan_optimizations_optimize_read_in_window_order_long
max_memory_usage=20000000
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n_x"
$CLICKHOUSE_CLIENT -q "create table ${name} engine=MergeTree order by tuple() as select toInt64((sin(number)+2)*65535)%500 as n, number as x from numbers_mt(5000000)"
$CLICKHOUSE_CLIENT -q "create table ${name}_n engine=MergeTree order by n as select * from ${name} order by n"
$CLICKHOUSE_CLIENT -q "create table ${name}_n_x engine=MergeTree order by (n, x) as select * from ${name} order by n, x"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n final"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n_x final"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n+x%2 ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "drop table ${name}"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n_x"