mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
stash
This commit is contained in:
parent
bd89fcafdb
commit
4c51329ad6
@ -1527,6 +1527,20 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet &
|
||||
return res;
|
||||
}
|
||||
|
||||
ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const SortDescription & sort_description) const
|
||||
{
|
||||
std::unordered_set<const Node *> split_nodes;
|
||||
for (const auto & sort_column : sort_description)
|
||||
{
|
||||
const auto * node = tryFindInIndex(sort_column.column_name);
|
||||
if (node)
|
||||
split_nodes.insert(node);
|
||||
}
|
||||
auto res = split(split_nodes);
|
||||
res.second->project_input = project_input;
|
||||
return res;
|
||||
}
|
||||
|
||||
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
|
||||
{
|
||||
const auto * node = tryFindInIndex(column_name);
|
||||
|
@ -7,6 +7,8 @@
|
||||
|
||||
#include "config_core.h"
|
||||
|
||||
#include <Core/SortDescription.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -274,6 +276,9 @@ public:
|
||||
/// Index of initial actions must contain column_name.
|
||||
SplitResult splitActionsForFilter(const std::string & column_name) const;
|
||||
|
||||
///
|
||||
SplitResult splitActionsBySortingDescription(const SortDescription & sort_description) const;
|
||||
|
||||
/// Create actions which may calculate part of filter using only available_inputs.
|
||||
/// If nothing may be calculated, returns nullptr.
|
||||
/// Otherwise, return actions which inputs are from available_inputs.
|
||||
|
@ -44,16 +44,19 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
|
||||
/// May split FilterStep and push down only part of it.
|
||||
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
|
||||
|
||||
///
|
||||
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
|
||||
|
||||
inline const auto & getOptimizations()
|
||||
{
|
||||
static const std::array<Optimization, 5> optimizations =
|
||||
{{
|
||||
static const std::array<Optimization, 6> optimizations = {{
|
||||
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
|
||||
}};
|
||||
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
}};
|
||||
|
||||
return optimizations;
|
||||
}
|
||||
|
80
src/Processors/QueryPlan/Optimizations/liftUpFunctions.cpp
Normal file
80
src/Processors/QueryPlan/Optimizations/liftUpFunctions.cpp
Normal file
@ -0,0 +1,80 @@
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/SortingStep.h>
|
||||
|
||||
#include <base/logger_useful.h>
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
void swapSortingAndUnnecessaryCalculation(QueryPlan::Node * parent_node, ActionsDAGPtr && actions)
|
||||
{
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent_step = parent_node->step;
|
||||
auto & child_step = child_node->step;
|
||||
auto * sorting_step = typeid_cast<SortingStep *>(parent_step.get());
|
||||
|
||||
// Sorting -> UnnecessaryCalculations
|
||||
std::swap(parent_step, child_step);
|
||||
// UnnecessaryCalculations -> Sorting
|
||||
|
||||
sorting_step->updateInputStream(child_node->children.at(0)->step->getOutputStream());
|
||||
auto input_header = child_step->getInputStreams().at(0).header;
|
||||
sorting_step->updateOutputStream(input_header);
|
||||
parent_step = std::make_unique<ExpressionStep>(child_step->getOutputStream(), std::move(actions));
|
||||
}
|
||||
|
||||
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
if (parent_node->children.size() != 1)
|
||||
return 0;
|
||||
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent_step = parent_node->step;
|
||||
auto & child_step = child_node->step;
|
||||
auto * sorting_step = typeid_cast<SortingStep *>(parent_step.get());
|
||||
auto * expression_step = typeid_cast<ExpressionStep *>(child_step.get());
|
||||
|
||||
if (!sorting_step || !expression_step)
|
||||
return 0;
|
||||
|
||||
const auto & sort_columns = sorting_step->getSortDescription();
|
||||
const auto & expression = expression_step->getExpression();
|
||||
|
||||
for (auto sc : sort_columns)
|
||||
LOG_TRACE(&Poco::Logger::get("Optimizer"), "sort_columns: {}", sc.column_name);
|
||||
|
||||
auto split_actions = expression->splitActionsBySortingDescription(sort_columns);
|
||||
LOG_TRACE(&Poco::Logger::get("Optimizer"), "source: {}", expression->dumpDAG());
|
||||
LOG_TRACE(&Poco::Logger::get("Optimizer"), "first: {}", split_actions.first->dumpDAG());
|
||||
LOG_TRACE(&Poco::Logger::get("Optimizer"), "second: {}", split_actions.second->dumpDAG());
|
||||
|
||||
// No calculations can be postponed.
|
||||
if (split_actions.second->trivial())
|
||||
return 0;
|
||||
|
||||
// Everything can be done after the sorting.
|
||||
if (split_actions.first->trivial())
|
||||
{
|
||||
swapSortingAndUnnecessaryCalculation(parent_node, std::move(split_actions.second));
|
||||
return 2;
|
||||
}
|
||||
|
||||
// Sorting -> Expression
|
||||
auto & node = nodes.emplace_back();
|
||||
|
||||
node.children.swap(child_node->children);
|
||||
child_node->children.emplace_back(&node);
|
||||
|
||||
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(), std::move(split_actions.first));
|
||||
// Sorting (parent_node) -> UnnecessaryCalculations (child_node) -> NecessaryCalculations (node)
|
||||
swapSortingAndUnnecessaryCalculation(parent_node, std::move(split_actions.second));
|
||||
// UnnecessaryCalculations (child_node) -> Sorting (parent_node) -> NecessaryCalculations (node)
|
||||
|
||||
return 3;
|
||||
}
|
||||
}
|
@ -1,11 +1,12 @@
|
||||
#include <stdexcept>
|
||||
#include <IO/Operators.h>
|
||||
#include <Processors/Merges/MergingSortedTransform.h>
|
||||
#include <Processors/QueryPlan/SortingStep.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/Transforms/FinishSortingTransform.h>
|
||||
#include <Processors/Transforms/LimitsCheckingTransform.h>
|
||||
#include <Processors/Transforms/MergeSortingTransform.h>
|
||||
#include <Processors/Transforms/PartialSortingTransform.h>
|
||||
#include <Processors/Transforms/FinishSortingTransform.h>
|
||||
#include <Processors/Merges/MergingSortedTransform.h>
|
||||
#include <Processors/Transforms/LimitsCheckingTransform.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Common/JSONBuilder.h>
|
||||
|
||||
namespace DB
|
||||
@ -88,6 +89,19 @@ SortingStep::SortingStep(
|
||||
output_stream->sort_mode = DataStream::SortMode::Stream;
|
||||
}
|
||||
|
||||
void SortingStep::updateInputStream(const DataStream & input_stream)
|
||||
{
|
||||
input_streams.clear();
|
||||
input_streams.emplace_back(input_stream);
|
||||
}
|
||||
|
||||
void SortingStep::updateOutputStream(Block result_header)
|
||||
{
|
||||
if (input_streams.size() != 1)
|
||||
throw std::runtime_error{"wasted"};
|
||||
output_stream = createOutputStream(input_streams.at(0), result_header, getDataStreamTraits());
|
||||
}
|
||||
|
||||
void SortingStep::updateLimit(size_t limit_)
|
||||
{
|
||||
if (limit_ && (limit == 0 || limit_ < limit))
|
||||
|
@ -49,6 +49,11 @@ public:
|
||||
/// Add limit or change it to lower value.
|
||||
void updateLimit(size_t limit_);
|
||||
|
||||
void updateInputStream(const DataStream & input_stream);
|
||||
void updateOutputStream(Block result_header);
|
||||
|
||||
SortDescription getSortDescription() const { return result_description; }
|
||||
|
||||
private:
|
||||
|
||||
enum class Type
|
||||
|
@ -0,0 +1,4 @@
|
||||
<test>
|
||||
<query>SELECT sipHash64(number) FROM numbers(1e8) ORDER BY number LIMIT 5</query>
|
||||
<query>SELECT sipHash64(number) FROM numbers(1e8) ORDER BY number + 1 LIMIT 5</query>
|
||||
</test>
|
@ -142,3 +142,15 @@ Filter
|
||||
Filter
|
||||
2 3
|
||||
2 3
|
||||
> function calculation should be done after sorting and limit (if possible)
|
||||
> the whole Expression node could be moved after Sorting
|
||||
Expression
|
||||
Limit
|
||||
Expression
|
||||
Sorting
|
||||
> Expression should be divided into two subnodes and only one of them could be moved after Sorting
|
||||
Expression
|
||||
Limit
|
||||
Expression
|
||||
Sorting
|
||||
Expression
|
||||
|
@ -196,3 +196,13 @@ $CLICKHOUSE_CLIENT -q "
|
||||
select a, b from (
|
||||
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
|
||||
) where a != 1 settings enable_optimize_predicate_expression = 0"
|
||||
|
||||
echo "> function calculation should be done after sorting and limit (if possible)"
|
||||
echo "> the whole Expression node could be moved after Sorting"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain select sipHash64(number) from numbers(100) order by number limit 5" |
|
||||
sed 's/ //g' | grep -o "^ *\(Expression\|Limit\|Sorting\)"
|
||||
echo "> Expression should be divided into two subnodes and only one of them could be moved after Sorting"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain select sipHash64(number) from numbers(100) order by number + 1 limit 5" |
|
||||
sed 's/ //g' | grep -o "^ *\(Expression\|Limit\|Sorting\)"
|
||||
|
Loading…
Reference in New Issue
Block a user