Merge pull request #35623 from nickitat/function_calculation_after_sorting_and_limit

Functions calculation after sorting
This commit is contained in:
Nikolai Kochetov 2022-04-05 12:09:56 +02:00 committed by GitHub
commit 4479b68980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 192 additions and 44 deletions

View File

@ -1527,6 +1527,21 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet &
return res; return res;
} }
ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameSet & sort_columns) const
{
std::unordered_set<const Node *> split_nodes;
for (const auto & sort_column : sort_columns)
if (const auto * node = tryFindInIndex(sort_column))
split_nodes.insert(node);
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Sorting column {} wasn't found in the ActionsDAG's index. DAG:\n{}", sort_column, dumpDAG());
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
}
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
{ {
const auto * node = tryFindInIndex(column_name); const auto * node = tryFindInIndex(column_name);

View File

@ -274,6 +274,10 @@ public:
/// Index of initial actions must contain column_name. /// Index of initial actions must contain column_name.
SplitResult splitActionsForFilter(const std::string & column_name) const; SplitResult splitActionsForFilter(const std::string & column_name) const;
/// Splits actions into two parts. The first part contains all the calculations required to calculate sort_columns.
/// The second contains the rest.
SplitResult splitActionsBySortingDescription(const NameSet & sort_columns) const;
/// Create actions which may calculate part of filter using only available_inputs. /// Create actions which may calculate part of filter using only available_inputs.
/// If nothing may be calculated, returns nullptr. /// If nothing may be calculated, returns nullptr.
/// Otherwise, return actions which inputs are from available_inputs. /// Otherwise, return actions which inputs are from available_inputs.

View File

@ -44,16 +44,20 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
/// May split FilterStep and push down only part of it. /// May split FilterStep and push down only part of it.
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes); size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Move ExpressionStep after SortingStep if possible.
/// May split ExpressionStep and lift up only a part of it.
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations() inline const auto & getOptimizations()
{ {
static const std::array<Optimization, 5> optimizations = static const std::array<Optimization, 6> optimizations = {{
{{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan}, {tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan}, {tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan}, {trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan}, {tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down}, {tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
}}; {tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
}};
return optimizations; return optimizations;
} }

View File

@ -0,0 +1,77 @@
#include <Interpreters/ActionsDAG.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace
{
const DB::DataStream & getChildOutputStream(DB::QueryPlan::Node & node)
{
if (node.children.size() != 1)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Node \"{}\" is expected to have only one child.", node.step->getName());
return node.children.front()->step->getOutputStream();
}
}
namespace DB::QueryPlanOptimizations
{
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
return 0;
QueryPlan::Node * child_node = parent_node->children.front();
auto & parent_step = parent_node->step;
auto & child_step = child_node->step;
auto * sorting_step = typeid_cast<SortingStep *>(parent_step.get());
auto * expression_step = typeid_cast<ExpressionStep *>(child_step.get());
if (!sorting_step || !expression_step)
return 0;
NameSet sort_columns;
for (const auto & col : sorting_step->getSortDescription())
sort_columns.insert(col.column_name);
auto [needed_for_sorting, unneeded_for_sorting] = expression_step->getExpression()->splitActionsBySortingDescription(sort_columns);
// No calculations can be postponed.
if (unneeded_for_sorting->trivial())
return 0;
// Sorting (parent_node) -> Expression (child_node)
auto & node_with_needed = nodes.emplace_back();
std::swap(node_with_needed.children, child_node->children);
child_node->children = {&node_with_needed};
node_with_needed.step = std::make_unique<ExpressionStep>(getChildOutputStream(node_with_needed), std::move(needed_for_sorting));
node_with_needed.step->setStepDescription(child_step->getStepDescription());
// Sorting (parent_node) -> so far the origin Expression (child_node) -> NeededCalculations (node_with_needed)
std::swap(parent_step, child_step);
// so far the origin Expression (parent_node) -> Sorting (child_node) -> NeededCalculations (node_with_needed)
sorting_step->updateInputStream(getChildOutputStream(*child_node));
auto input_header = sorting_step->getInputStreams().at(0).header;
sorting_step->updateOutputStream(std::move(input_header));
auto description = parent_step->getStepDescription();
parent_step = std::make_unique<DB::ExpressionStep>(child_step->getOutputStream(), std::move(unneeded_for_sorting));
parent_step->setStepDescription(description + " [lifted up part]");
// UneededCalculations (parent_node) -> Sorting (child_node) -> NeededCalculations (node_with_needed)
return 3;
}
}

View File

@ -1,11 +1,12 @@
#include <stdexcept>
#include <IO/Operators.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/QueryPlan/SortingStep.h> #include <Processors/QueryPlan/SortingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h> #include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h> #include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
namespace DB namespace DB
@ -88,6 +89,18 @@ SortingStep::SortingStep(
output_stream->sort_mode = DataStream::SortMode::Stream; output_stream->sort_mode = DataStream::SortMode::Stream;
} }
void SortingStep::updateInputStream(DataStream input_stream)
{
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
}
void SortingStep::updateOutputStream(Block result_header)
{
output_stream = createOutputStream(input_streams.at(0), std::move(result_header), getDataStreamTraits());
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void SortingStep::updateLimit(size_t limit_) void SortingStep::updateLimit(size_t limit_)
{ {
if (limit_ && (limit == 0 || limit_ < limit)) if (limit_ && (limit == 0 || limit_ < limit))

View File

@ -49,6 +49,11 @@ public:
/// Add limit or change it to lower value. /// Add limit or change it to lower value.
void updateLimit(size_t limit_); void updateLimit(size_t limit_);
void updateInputStream(DataStream input_stream);
void updateOutputStream(Block result_header);
SortDescription getSortDescription() const { return result_description; }
private: private:
enum class Type enum class Type

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT sipHash64(number) FROM numbers(1e8) ORDER BY number LIMIT 5</query>
<query>SELECT sipHash64(number) FROM numbers(1e8) ORDER BY number + 1 LIMIT 5</query>
</test>

View File

@ -35,10 +35,11 @@ Expression (Projection)
ReadFromMergeTree (default.test_table) ReadFromMergeTree (default.test_table)
Expression (Projection) Expression (Projection)
Limit (preliminary LIMIT (without OFFSET)) Limit (preliminary LIMIT (without OFFSET))
Sorting Expression (Before ORDER BY [lifted up part])
Expression (Before ORDER BY) Sorting
SettingQuotaAndLimits (Set limits and quota after reading from storage) Expression (Before ORDER BY)
ReadFromMergeTree (default.test_table) SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree (default.test_table)
optimize_aggregation_in_order optimize_aggregation_in_order
Expression ((Projection + Before ORDER BY)) Expression ((Projection + Before ORDER BY))
Aggregating Aggregating

View File

@ -925,10 +925,11 @@ Expression ((Projection + Before ORDER BY))
Window (Window step for window \'ORDER BY o ASC, number ASC\') Window (Window step for window \'ORDER BY o ASC, number ASC\')
Sorting (Sorting for window \'ORDER BY o ASC, number ASC\') Sorting (Sorting for window \'ORDER BY o ASC, number ASC\')
Window (Window step for window \'ORDER BY number ASC\') Window (Window step for window \'ORDER BY number ASC\')
Sorting (Sorting for window \'ORDER BY number ASC\') Expression ((Before window functions + (Projection + Before ORDER BY)) [lifted up part])
Expression ((Before window functions + (Projection + Before ORDER BY))) Sorting (Sorting for window \'ORDER BY number ASC\')
SettingQuotaAndLimits (Set limits and quota after reading from storage) Expression ((Before window functions + (Projection + Before ORDER BY)))
ReadFromStorage (SystemNumbers) SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
-- A test case for the sort comparator found by fuzzer. -- A test case for the sort comparator found by fuzzer.
SELECT SELECT
max(number) OVER (ORDER BY number DESC NULLS FIRST), max(number) OVER (ORDER BY number DESC NULLS FIRST),

View File

@ -10,8 +10,8 @@ set max_block_size=40960;
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption -- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption
-- MergeSortingTransform: Memory usage is lowered from 186.25 MiB to 95.00 MiB -- MergeSortingTransform: Memory usage is lowered from 186.25 MiB to 95.00 MiB
-- MergeSortingTransform: Re-merging is not useful (memory usage was not lowered by remerge_sort_lowered_memory_bytes_ratio=2.0) -- MergeSortingTransform: Re-merging is not useful (memory usage was not lowered by remerge_sort_lowered_memory_bytes_ratio=2.0)
select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(3e6) order by k limit 400e3 format Null; -- { serverError 241 } select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(3e6) order by v1, v2 limit 400e3 format Null; -- { serverError 241 }
select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(3e6) order by k limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=2. format Null; -- { serverError 241 } select number k, repeat(toString(number), 11) v1, repeat(toString(number), 12) v2 from numbers(3e6) order by v1, v2 limit 400e3 settings remerge_sort_lowered_memory_bytes_ratio=2. format Null; -- { serverError 241 }
-- remerge_sort_lowered_memory_bytes_ratio 1.9 is good (need at least 1.91/0.98=1.94) -- remerge_sort_lowered_memory_bytes_ratio 1.9 is good (need at least 1.91/0.98=1.94)
-- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption -- MergeSortingTransform: Re-merging intermediate ORDER BY data (20 blocks with 819200 rows) to save memory consumption

View File

@ -142,3 +142,12 @@ Filter
Filter Filter
2 3 2 3
2 3 2 3
> function calculation should be done after sorting and limit (if possible)
> Expression should be divided into two subexpressions and only one of them should be moved after Sorting
Expression (Before ORDER BY [lifted up part])
FUNCTION sipHash64
Sorting
Expression (Before ORDER BY)
FUNCTION plus
> this query should be executed without throwing an exception
0

View File

@ -196,3 +196,12 @@ $CLICKHOUSE_CLIENT -q "
select a, b from ( select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2) select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0" ) where a != 1 settings enable_optimize_predicate_expression = 0"
echo "> function calculation should be done after sorting and limit (if possible)"
echo "> Expression should be divided into two subexpressions and only one of them should be moved after Sorting"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select number as n, sipHash64(n) from numbers(100) order by number + 1 limit 5" |
sed 's/^ *//g' | grep -o "^ *\(Expression (Before ORDER BY.*)\|Sorting\|FUNCTION \w\+\)"
echo "> this query should be executed without throwing an exception"
$CLICKHOUSE_CLIENT -q "
select throwIf(number = 5) from (select * from numbers(10)) order by number limit 1"

View File

@ -7,13 +7,15 @@
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Sorting) (Expression)
MergingSortedTransform 2 → 1 ExpressionTransform
(Expression) (Sorting)
ExpressionTransform × 2 MergingSortedTransform 2 → 1
(SettingQuotaAndLimits) (Expression)
(ReadFromMergeTree) ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1 (SettingQuotaAndLimits)
(ReadFromMergeTree)
MergeTreeInOrder × 2 0 → 1
2020-10-01 9 2020-10-01 9
2020-10-01 9 2020-10-01 9
2020-10-01 9 2020-10-01 9
@ -23,16 +25,18 @@ ExpressionTransform
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Sorting) (Expression)
MergingSortedTransform 2 → 1 ExpressionTransform
(Expression) (Sorting)
ExpressionTransform × 2 MergingSortedTransform 2 → 1
(SettingQuotaAndLimits) (Expression)
(ReadFromMergeTree) ExpressionTransform × 2
ReverseTransform (SettingQuotaAndLimits)
MergeTreeReverse 0 → 1 (ReadFromMergeTree)
ReverseTransform ReverseTransform
MergeTreeReverse 0 → 1 MergeTreeReverse 0 → 1
ReverseTransform
MergeTreeReverse 0 → 1
2020-10-01 9 2020-10-01 9
2020-10-01 9 2020-10-01 9
2020-10-01 9 2020-10-01 9
@ -42,15 +46,17 @@ ExpressionTransform
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Sorting) (Expression)
FinishSortingTransform ExpressionTransform
PartialSortingTransform (Sorting)
MergingSortedTransform 2 → 1 FinishSortingTransform
(Expression) PartialSortingTransform
ExpressionTransform × 2 MergingSortedTransform 2 → 1
(SettingQuotaAndLimits) (Expression)
(ReadFromMergeTree) ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1 (SettingQuotaAndLimits)
(ReadFromMergeTree)
MergeTreeInOrder × 2 0 → 1
2020-10-11 0 2020-10-11 0
2020-10-11 0 2020-10-11 0
2020-10-11 0 2020-10-11 0