mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Merge pull request #20341 from ClickHouse/filter-push-down
Filter push down
This commit is contained in:
commit
976dbe8077
@ -101,8 +101,8 @@ endif()
|
||||
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
|
||||
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
|
||||
|
||||
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp)
|
||||
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h)
|
||||
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp Functions/FunctionsLogical.cpp)
|
||||
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h Functions/FunctionsLogical.h)
|
||||
|
||||
list (APPEND dbms_sources
|
||||
AggregateFunctions/AggregateFunctionFactory.cpp
|
||||
|
@ -538,12 +538,13 @@
|
||||
M(569, MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD) \
|
||||
M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \
|
||||
M(571, DATABASE_REPLICATION_FAILED) \
|
||||
M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
M(1001, STD_EXCEPTION) \
|
||||
M(1002, UNKNOWN_EXCEPTION) \
|
||||
M(1003, INVALID_SHARD_ID)
|
||||
M(1003, INVALID_SHARD_ID) \
|
||||
|
||||
/* See END */
|
||||
|
||||
|
@ -437,6 +437,7 @@ class IColumn;
|
||||
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
|
||||
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
|
||||
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
|
||||
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
|
||||
|
||||
// End of COMMON_SETTINGS
|
||||
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Functions/IFunctionAdaptors.h>
|
||||
#include <Functions/FunctionsConversion.h>
|
||||
#include <Functions/materialize.h>
|
||||
#include <Functions/FunctionsLogical.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExpressionJIT.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
@ -364,7 +365,7 @@ void ActionsDAG::removeUnusedActions(const std::vector<Node *> & required_nodes)
|
||||
removeUnusedActions();
|
||||
}
|
||||
|
||||
void ActionsDAG::removeUnusedActions()
|
||||
void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
|
||||
{
|
||||
std::unordered_set<const Node *> visited_nodes;
|
||||
std::stack<Node *> stack;
|
||||
@ -388,6 +389,9 @@ void ActionsDAG::removeUnusedActions()
|
||||
visited_nodes.insert(&node);
|
||||
stack.push(&node);
|
||||
}
|
||||
|
||||
if (node.type == ActionType::INPUT && !allow_remove_inputs)
|
||||
visited_nodes.insert(&node);
|
||||
}
|
||||
|
||||
while (!stack.empty())
|
||||
@ -516,6 +520,11 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
|
||||
if (col == child)
|
||||
return false;
|
||||
|
||||
/// Do not remove input if it was mentioned in index several times.
|
||||
for (const auto * node : index)
|
||||
if (col == node)
|
||||
return false;
|
||||
|
||||
/// Remove from nodes and inputs.
|
||||
for (auto jt = nodes.begin(); jt != nodes.end(); ++jt)
|
||||
{
|
||||
@ -1203,4 +1212,340 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & co
|
||||
return split(split_nodes);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct ConjunctionNodes
|
||||
{
|
||||
std::vector<ActionsDAG::Node *> allowed;
|
||||
std::vector<ActionsDAG::Node *> rejected;
|
||||
};
|
||||
|
||||
/// Take a node which result is predicate.
|
||||
/// Assuming predicate is a conjunction (probably, trivial).
|
||||
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
|
||||
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
|
||||
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
|
||||
{
|
||||
ConjunctionNodes conjunction;
|
||||
std::unordered_set<ActionsDAG::Node *> allowed;
|
||||
std::unordered_set<ActionsDAG::Node *> rejected;
|
||||
|
||||
struct Frame
|
||||
{
|
||||
ActionsDAG::Node * node;
|
||||
bool is_predicate = false;
|
||||
size_t next_child_to_visit = 0;
|
||||
size_t num_allowed_children = 0;
|
||||
};
|
||||
|
||||
std::stack<Frame> stack;
|
||||
std::unordered_set<ActionsDAG::Node *> visited_nodes;
|
||||
|
||||
stack.push(Frame{.node = predicate, .is_predicate = true});
|
||||
visited_nodes.insert(predicate);
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & cur = stack.top();
|
||||
bool is_conjunction = cur.is_predicate
|
||||
&& cur.node->type == ActionsDAG::ActionType::FUNCTION
|
||||
&& cur.node->function_base->getName() == "and";
|
||||
|
||||
/// At first, visit all children.
|
||||
while (cur.next_child_to_visit < cur.node->children.size())
|
||||
{
|
||||
auto * child = cur.node->children[cur.next_child_to_visit];
|
||||
|
||||
if (visited_nodes.count(child) == 0)
|
||||
{
|
||||
visited_nodes.insert(child);
|
||||
stack.push({.node = child, .is_predicate = is_conjunction});
|
||||
break;
|
||||
}
|
||||
|
||||
if (allowed_nodes.contains(child))
|
||||
++cur.num_allowed_children;
|
||||
++cur.next_child_to_visit;
|
||||
}
|
||||
|
||||
if (cur.next_child_to_visit == cur.node->children.size())
|
||||
{
|
||||
if (cur.num_allowed_children == cur.node->children.size())
|
||||
{
|
||||
if (cur.node->type != ActionsDAG::ActionType::ARRAY_JOIN && cur.node->type != ActionsDAG::ActionType::INPUT)
|
||||
allowed_nodes.emplace(cur.node);
|
||||
}
|
||||
else if (is_conjunction)
|
||||
{
|
||||
for (auto * child : cur.node->children)
|
||||
{
|
||||
if (allowed_nodes.count(child))
|
||||
{
|
||||
if (allowed.insert(child).second)
|
||||
conjunction.allowed.push_back(child);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (cur.is_predicate)
|
||||
{
|
||||
if (rejected.insert(cur.node).second)
|
||||
conjunction.rejected.push_back(cur.node);
|
||||
}
|
||||
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
if (conjunction.allowed.empty())
|
||||
{
|
||||
/// If nothing was added to conjunction, check if it is trivial.
|
||||
if (allowed_nodes.count(predicate))
|
||||
conjunction.allowed.push_back(predicate);
|
||||
}
|
||||
|
||||
return conjunction;
|
||||
}
|
||||
|
||||
ColumnsWithTypeAndName prepareFunctionArguments(const std::vector<ActionsDAG::Node *> nodes)
|
||||
{
|
||||
ColumnsWithTypeAndName arguments;
|
||||
arguments.reserve(nodes.size());
|
||||
|
||||
for (const auto * child : nodes)
|
||||
{
|
||||
ColumnWithTypeAndName argument;
|
||||
argument.column = child->column;
|
||||
argument.type = child->result_type;
|
||||
argument.name = child->result_name;
|
||||
|
||||
arguments.emplace_back(std::move(argument));
|
||||
}
|
||||
|
||||
return arguments;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Create actions which calculate conjunction of selected nodes.
|
||||
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
|
||||
///
|
||||
/// Result actions add single column with conjunction result (it is always last in index).
|
||||
/// No other columns are added or removed.
|
||||
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction)
|
||||
{
|
||||
if (conjunction.empty())
|
||||
return nullptr;
|
||||
|
||||
auto actions = cloneEmpty();
|
||||
actions->settings.project_input = false;
|
||||
|
||||
FunctionOverloadResolverPtr func_builder_and =
|
||||
std::make_shared<FunctionOverloadResolverAdaptor>(
|
||||
std::make_unique<DefaultOverloadResolver>(
|
||||
std::make_shared<FunctionAnd>()));
|
||||
|
||||
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> nodes_mapping;
|
||||
|
||||
struct Frame
|
||||
{
|
||||
const ActionsDAG::Node * node;
|
||||
size_t next_child_to_visit = 0;
|
||||
};
|
||||
|
||||
std::stack<Frame> stack;
|
||||
|
||||
/// DFS. Clone actions.
|
||||
for (const auto * predicate : conjunction)
|
||||
{
|
||||
if (nodes_mapping.count(predicate))
|
||||
continue;
|
||||
|
||||
stack.push({.node = predicate});
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & cur = stack.top();
|
||||
/// At first, visit all children.
|
||||
while (cur.next_child_to_visit < cur.node->children.size())
|
||||
{
|
||||
auto * child = cur.node->children[cur.next_child_to_visit];
|
||||
|
||||
if (nodes_mapping.count(child) == 0)
|
||||
{
|
||||
stack.push({.node = child});
|
||||
break;
|
||||
}
|
||||
|
||||
++cur.next_child_to_visit;
|
||||
}
|
||||
|
||||
if (cur.next_child_to_visit == cur.node->children.size())
|
||||
{
|
||||
auto & node = actions->nodes.emplace_back(*cur.node);
|
||||
nodes_mapping[cur.node] = &node;
|
||||
|
||||
for (auto & child : node.children)
|
||||
child = nodes_mapping[child];
|
||||
|
||||
if (node.type == ActionType::INPUT)
|
||||
{
|
||||
actions->inputs.emplace_back(&node);
|
||||
actions->index.insert(&node);
|
||||
}
|
||||
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Node * result_predicate = nodes_mapping[*conjunction.begin()];
|
||||
|
||||
if (conjunction.size() > 1)
|
||||
{
|
||||
std::vector<Node *> args;
|
||||
args.reserve(conjunction.size());
|
||||
for (const auto * predicate : conjunction)
|
||||
args.emplace_back(nodes_mapping[predicate]);
|
||||
|
||||
result_predicate = &actions->addFunction(func_builder_and, args, {}, true, false);
|
||||
}
|
||||
|
||||
actions->index.insert(result_predicate);
|
||||
return actions;
|
||||
}
|
||||
|
||||
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs)
|
||||
{
|
||||
Node * predicate;
|
||||
|
||||
{
|
||||
auto it = index.begin();
|
||||
for (; it != index.end(); ++it)
|
||||
if ((*it)->result_name == filter_name)
|
||||
break;
|
||||
|
||||
if (it == index.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
|
||||
filter_name, dumpDAG());
|
||||
|
||||
predicate = *it;
|
||||
}
|
||||
|
||||
std::unordered_set<const Node *> allowed_nodes;
|
||||
|
||||
/// Get input nodes from available_inputs names.
|
||||
{
|
||||
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
|
||||
for (const auto & input : inputs)
|
||||
inputs_map[input->result_name].emplace_back(input);
|
||||
|
||||
for (const auto & name : available_inputs)
|
||||
{
|
||||
auto & inputs_list = inputs_map[name];
|
||||
if (inputs_list.empty())
|
||||
continue;
|
||||
|
||||
allowed_nodes.emplace(inputs_list.front());
|
||||
inputs_list.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
|
||||
auto actions = cloneActionsForConjunction(conjunction.allowed);
|
||||
if (!actions)
|
||||
return nullptr;
|
||||
|
||||
/// Now, when actions are created, update current DAG.
|
||||
|
||||
if (conjunction.rejected.empty())
|
||||
{
|
||||
/// The whole predicate was split.
|
||||
if (can_remove_filter)
|
||||
{
|
||||
/// If filter column is not needed, remove it from index.
|
||||
for (auto i = index.begin(); i != index.end(); ++i)
|
||||
{
|
||||
if (*i == predicate)
|
||||
{
|
||||
index.remove(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Replace predicate result to constant 1.
|
||||
Node node;
|
||||
node.type = ActionType::COLUMN;
|
||||
node.result_name = std::move(predicate->result_name);
|
||||
node.result_type = std::move(predicate->result_type);
|
||||
node.column = node.result_type->createColumnConst(0, 1);
|
||||
*predicate = std::move(node);
|
||||
}
|
||||
|
||||
removeUnusedActions(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Predicate is conjunction, where both allowed and rejected sets are not empty.
|
||||
/// Replace this node to conjunction of rejected predicates.
|
||||
|
||||
std::vector<Node *> new_children(conjunction.rejected.begin(), conjunction.rejected.end());
|
||||
|
||||
if (new_children.size() == 1)
|
||||
{
|
||||
/// Rejected set has only one predicate.
|
||||
if (new_children.front()->result_type->equals(*predicate->result_type))
|
||||
{
|
||||
/// If it's type is same, just add alias.
|
||||
Node node;
|
||||
node.type = ActionType::ALIAS;
|
||||
node.result_name = predicate->result_name;
|
||||
node.result_type = predicate->result_type;
|
||||
node.children.swap(new_children);
|
||||
*predicate = std::move(node);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If type is different, cast column.
|
||||
/// This case is possible, cause AND can use any numeric type as argument.
|
||||
Node node;
|
||||
node.type = ActionType::COLUMN;
|
||||
node.result_name = predicate->result_type->getName();
|
||||
node.column = DataTypeString().createColumnConst(0, node.result_name);
|
||||
node.result_type = std::make_shared<DataTypeString>();
|
||||
|
||||
auto * right_arg = &nodes.emplace_back(std::move(node));
|
||||
auto * left_arg = new_children.front();
|
||||
|
||||
predicate->children = {left_arg, right_arg};
|
||||
auto arguments = prepareFunctionArguments(predicate->children);
|
||||
|
||||
FunctionOverloadResolverPtr func_builder_cast =
|
||||
std::make_shared<FunctionOverloadResolverAdaptor>(
|
||||
CastOverloadResolver<CastType::nonAccurate>::createImpl(false));
|
||||
|
||||
predicate->function_builder = func_builder_cast;
|
||||
predicate->function_base = predicate->function_builder->build(arguments);
|
||||
predicate->function = predicate->function_base->prepare(arguments);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Predicate is function AND, which still have more then one argument.
|
||||
/// Just update children and rebuild it.
|
||||
predicate->children.swap(new_children);
|
||||
auto arguments = prepareFunctionArguments(predicate->children);
|
||||
|
||||
predicate->function_base = predicate->function_builder->build(arguments);
|
||||
predicate->function = predicate->function_base->prepare(arguments);
|
||||
}
|
||||
|
||||
removeUnusedActions(false);
|
||||
}
|
||||
|
||||
return actions;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -152,6 +152,9 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// NOTE: std::list is an implementation detail.
|
||||
/// It allows to add and remove new nodes inplace without reallocation.
|
||||
/// Raw pointers to nodes remain valid.
|
||||
using Nodes = std::list<Node>;
|
||||
using Inputs = std::vector<Node *>;
|
||||
|
||||
@ -278,6 +281,13 @@ public:
|
||||
/// Index of initial actions must contain column_name.
|
||||
SplitResult splitActionsForFilter(const std::string & column_name) const;
|
||||
|
||||
/// Create actions which may calculate part of filter using only available_inputs.
|
||||
/// If nothing may be calculated, returns nullptr.
|
||||
/// Otherwise, return actions which inputs are from available_inputs.
|
||||
/// Returned actions add single column which may be used for filter.
|
||||
/// Also, replace some nodes of current inputs to constant 1 in case they are filtered.
|
||||
ActionsDAGPtr splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs);
|
||||
|
||||
private:
|
||||
Node & addNode(Node node, bool can_replace = false, bool add_to_index = true);
|
||||
Node & getNode(const std::string & name);
|
||||
@ -302,10 +312,12 @@ private:
|
||||
}
|
||||
|
||||
void removeUnusedActions(const std::vector<Node *> & required_nodes);
|
||||
void removeUnusedActions();
|
||||
void removeUnusedActions(bool allow_remove_inputs = true);
|
||||
void addAliases(const NamesWithAliases & aliases, std::vector<Node *> & result_nodes);
|
||||
|
||||
void compileFunctions();
|
||||
|
||||
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction);
|
||||
};
|
||||
|
||||
|
||||
|
@ -284,7 +284,7 @@ void SelectStreamFactory::createForShard(
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
{
|
||||
auto plan = createLocalPlan(modified_query_ast, header, context, stage);
|
||||
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline()));
|
||||
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()))));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -117,7 +117,7 @@ struct QueryPlanSettings
|
||||
{
|
||||
QueryPlan::ExplainPlanOptions query_plan_options;
|
||||
|
||||
/// Apply query plan optimisations.
|
||||
/// Apply query plan optimizations.
|
||||
bool optimize = true;
|
||||
|
||||
constexpr static char name[] = "PLAN";
|
||||
@ -251,7 +251,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
|
||||
interpreter.buildQueryPlan(plan);
|
||||
|
||||
if (settings.optimize)
|
||||
plan.optimize();
|
||||
plan.optimize(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
|
||||
plan.explainPlan(buf, settings.query_plan_options);
|
||||
}
|
||||
@ -265,7 +265,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
|
||||
|
||||
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), context, SelectQueryOptions());
|
||||
interpreter.buildQueryPlan(plan);
|
||||
auto pipeline = plan.buildQueryPipeline();
|
||||
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
|
||||
if (settings.graph)
|
||||
{
|
||||
|
@ -548,7 +548,7 @@ BlockIO InterpreterSelectQuery::execute()
|
||||
|
||||
buildQueryPlan(query_plan);
|
||||
|
||||
res.pipeline = std::move(*query_plan.buildQueryPipeline());
|
||||
res.pipeline = std::move(*query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef())));
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -413,7 +413,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
|
||||
QueryPlan query_plan;
|
||||
buildQueryPlan(query_plan);
|
||||
|
||||
auto pipeline = query_plan.buildQueryPipeline();
|
||||
auto pipeline = query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef()));
|
||||
|
||||
res.pipeline = std::move(*pipeline);
|
||||
res.pipeline.addInterpreterContext(context);
|
||||
|
@ -756,7 +756,7 @@ QueryPipelinePtr MutationsInterpreter::addStreamsForLaterStages(const std::vecto
|
||||
}
|
||||
}
|
||||
|
||||
auto pipeline = plan.buildQueryPipeline();
|
||||
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
pipeline->addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<MaterializingTransform>(header);
|
||||
|
@ -32,6 +32,8 @@ public:
|
||||
void describeActions(FormatSettings &) const override;
|
||||
void describePipeline(FormatSettings & settings) const override;
|
||||
|
||||
const Aggregator::Params & getParams() const { return params; }
|
||||
|
||||
private:
|
||||
Aggregator::Params params;
|
||||
bool final;
|
||||
|
@ -34,7 +34,7 @@ private:
|
||||
class CreatingSetsStep : public IQueryPlanStep
|
||||
{
|
||||
public:
|
||||
CreatingSetsStep(DataStreams input_streams_);
|
||||
explicit CreatingSetsStep(DataStreams input_streams_);
|
||||
|
||||
String getName() const override { return "CreatingSets"; }
|
||||
|
||||
|
@ -43,4 +43,9 @@ void CubeStep::transformPipeline(QueryPipeline & pipeline)
|
||||
});
|
||||
}
|
||||
|
||||
const Aggregator::Params & CubeStep::getParams() const
|
||||
{
|
||||
return params->params;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -18,6 +19,7 @@ public:
|
||||
|
||||
void transformPipeline(QueryPipeline & pipeline) override;
|
||||
|
||||
const Aggregator::Params & getParams() const;
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
};
|
||||
|
@ -17,6 +17,8 @@ public:
|
||||
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
const SortDescription & getSortDescription() const { return sort_description; }
|
||||
|
||||
private:
|
||||
SortDescription sort_description;
|
||||
};
|
||||
|
@ -1,39 +0,0 @@
|
||||
#include <Processors/QueryPlan/MaterializingStep.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
|
||||
#include <DataStreams/materializeBlock.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static ITransformingStep::Traits getTraits()
|
||||
{
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
},
|
||||
{
|
||||
.preserves_number_of_rows = true,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
MaterializingStep::MaterializingStep(const DataStream & input_stream_)
|
||||
: ITransformingStep(input_stream_, materializeBlock(input_stream_.header), getTraits())
|
||||
{
|
||||
}
|
||||
|
||||
void MaterializingStep::transformPipeline(QueryPipeline & pipeline)
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<MaterializingTransform>(header);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Materialize constants. See MaterializingTransform.
|
||||
class MaterializingStep : public ITransformingStep
|
||||
{
|
||||
public:
|
||||
explicit MaterializingStep(const DataStream & input_stream_);
|
||||
|
||||
String getName() const override { return "Materializing"; }
|
||||
|
||||
void transformPipeline(QueryPipeline & pipeline) override;
|
||||
};
|
||||
|
||||
}
|
@ -9,7 +9,7 @@ namespace QueryPlanOptimizations
|
||||
{
|
||||
|
||||
/// This is the main function which optimizes the whole QueryPlan tree.
|
||||
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes);
|
||||
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
|
||||
|
||||
/// Optimization is a function applied to QueryPlan::Node.
|
||||
/// It can read and update subtree of specified node.
|
||||
@ -38,14 +38,19 @@ size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
|
||||
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
|
||||
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
|
||||
|
||||
/// Move FilterStep down if possible.
|
||||
/// May split FilterStep and push down only part of it.
|
||||
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
|
||||
|
||||
inline const auto & getOptimizations()
|
||||
{
|
||||
static const std::array<Optimization, 4> optimizations =
|
||||
static const std::array<Optimization, 5> optimizations =
|
||||
{{
|
||||
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
|
||||
{tryPushDownLimit, "pushDownLimit"},
|
||||
{trySplitFilter, "splitFilter"},
|
||||
{tryMergeExpressions, "mergeExpressions"},
|
||||
{tryPushDownFilter, "pushDownFilter"},
|
||||
}};
|
||||
|
||||
return optimizations;
|
||||
|
@ -0,0 +1,12 @@
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <Core/Settings.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
QueryPlanOptimizationSettings::QueryPlanOptimizationSettings(const Settings & settings)
|
||||
{
|
||||
max_optimizations_to_apply = settings.query_plan_max_optimizations_to_apply;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct Settings;
|
||||
|
||||
struct QueryPlanOptimizationSettings
|
||||
{
|
||||
QueryPlanOptimizationSettings() = delete;
|
||||
explicit QueryPlanOptimizationSettings(const Settings & settings);
|
||||
|
||||
/// If not zero, throw if too many optimizations were applied to query plan.
|
||||
/// It helps to avoid infinite optimization loop.
|
||||
size_t max_optimizations_to_apply = 0;
|
||||
};
|
||||
|
||||
}
|
204
src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
Normal file
204
src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
Normal file
@ -0,0 +1,204 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/AggregatingStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/ArrayJoinStep.h>
|
||||
#include <Processors/QueryPlan/CubeStep.h>
|
||||
#include <Processors/QueryPlan/FinishSortingStep.h>
|
||||
#include <Processors/QueryPlan/MergeSortingStep.h>
|
||||
#include <Processors/QueryPlan/MergingSortedStep.h>
|
||||
#include <Processors/QueryPlan/PartialSortingStep.h>
|
||||
#include <Processors/QueryPlan/TotalsHavingStep.h>
|
||||
#include <Processors/QueryPlan/DistinctStep.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
static size_t tryAddNewFilterStep(
|
||||
QueryPlan::Node * parent_node,
|
||||
QueryPlan::Nodes & nodes,
|
||||
const Names & allowed_inputs)
|
||||
{
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
|
||||
auto * filter = static_cast<FilterStep *>(parent.get());
|
||||
const auto & expression = filter->getExpression();
|
||||
const auto & filter_column_name = filter->getFilterColumnName();
|
||||
bool removes_filter = filter->removesFilterColumn();
|
||||
|
||||
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
|
||||
|
||||
auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs);
|
||||
if (!split_filter)
|
||||
return 0;
|
||||
|
||||
// std::cerr << "===============\n" << expression->dumpDAG() << std::endl;
|
||||
// std::cerr << "---------------\n" << split_filter->dumpDAG() << std::endl;
|
||||
|
||||
const auto & index = expression->getIndex();
|
||||
auto it = index.begin();
|
||||
for (; it != index.end(); ++it)
|
||||
if ((*it)->result_name == filter_column_name)
|
||||
break;
|
||||
|
||||
const bool found_filter_column = it != expression->getIndex().end();
|
||||
|
||||
if (!found_filter_column && !removes_filter)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
|
||||
filter_column_name, expression->dumpDAG());
|
||||
|
||||
/// Filter column was replaced to constant.
|
||||
const bool filter_is_constant = found_filter_column && (*it)->column && isColumnConst(*(*it)->column);
|
||||
|
||||
if (!found_filter_column || filter_is_constant)
|
||||
/// This means that all predicates of filter were pused down.
|
||||
/// Replace current actions to expression, as we don't need to filter anything.
|
||||
parent = std::make_unique<ExpressionStep>(child->getOutputStream(), expression);
|
||||
|
||||
/// Add new Filter step before Aggregating.
|
||||
/// Expression/Filter -> Aggregating -> Something
|
||||
auto & node = nodes.emplace_back();
|
||||
node.children.swap(child_node->children);
|
||||
child_node->children.emplace_back(&node);
|
||||
/// Expression/Filter -> Aggregating -> Filter -> Something
|
||||
|
||||
/// New filter column is added to the end.
|
||||
auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name;
|
||||
node.step = std::make_unique<FilterStep>(
|
||||
node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split_filter), std::move(split_filter_column_name), true);
|
||||
|
||||
return 3;
|
||||
}
|
||||
|
||||
static Names getAggregatinKeys(const Aggregator::Params & params)
|
||||
{
|
||||
Names keys;
|
||||
keys.reserve(params.keys.size());
|
||||
for (auto pos : params.keys)
|
||||
keys.push_back(params.src_header.getByPosition(pos).name);
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
if (parent_node->children.size() != 1)
|
||||
return 0;
|
||||
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
auto * filter = typeid_cast<FilterStep *>(parent.get());
|
||||
|
||||
if (!filter)
|
||||
return 0;
|
||||
|
||||
if (filter->getExpression()->hasStatefulFunctions())
|
||||
return 0;
|
||||
|
||||
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
|
||||
{
|
||||
const auto & params = aggregating->getParams();
|
||||
Names keys = getAggregatinKeys(params);
|
||||
|
||||
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
|
||||
return updated_steps;
|
||||
}
|
||||
|
||||
if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get()))
|
||||
{
|
||||
/// If totals step has HAVING expression, skip it for now.
|
||||
/// TODO:
|
||||
/// We can merge HAING expression with current filer.
|
||||
/// Also, we can push down part of HAVING which depend only on aggregation keys.
|
||||
if (totals_having->getActions())
|
||||
return 0;
|
||||
|
||||
Names keys;
|
||||
const auto & header = totals_having->getInputStreams().front().header;
|
||||
for (const auto & column : header)
|
||||
if (typeid_cast<const DataTypeAggregateFunction *>(column.type.get()) == nullptr)
|
||||
keys.push_back(column.name);
|
||||
|
||||
/// NOTE: this optimization changes TOTALS value. Example:
|
||||
/// `select * from (select y, sum(x) from (
|
||||
/// select number as x, number % 4 as y from numbers(10)
|
||||
/// ) group by y with totals) where y != 2`
|
||||
/// Optimization will replace totals row `y, sum(x)` from `(0, 45)` to `(0, 37)`.
|
||||
/// It is expected to ok, cause AST optimization `enable_optimize_predicate_expression = 1` also brakes it.
|
||||
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
|
||||
return updated_steps;
|
||||
}
|
||||
|
||||
if (auto * array_join = typeid_cast<ArrayJoinStep *>(child.get()))
|
||||
{
|
||||
const auto & array_join_actions = array_join->arrayJoin();
|
||||
const auto & keys = array_join_actions->columns;
|
||||
const auto & array_join_header = array_join->getInputStreams().front().header;
|
||||
|
||||
Names allowed_inputs;
|
||||
for (const auto & column : array_join_header)
|
||||
if (keys.count(column.name) == 0)
|
||||
allowed_inputs.push_back(column.name);
|
||||
|
||||
// for (const auto & name : allowed_inputs)
|
||||
// std::cerr << name << std::endl;
|
||||
|
||||
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
|
||||
return updated_steps;
|
||||
}
|
||||
|
||||
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
|
||||
{
|
||||
Names allowed_inputs = distinct->getOutputStream().header.getNames();
|
||||
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
|
||||
return updated_steps;
|
||||
}
|
||||
|
||||
/// TODO.
|
||||
/// We can filter earlier if expression does not depend on WITH FILL columns.
|
||||
/// But we cannot just push down condition, because other column may be filled with defaults.
|
||||
///
|
||||
/// It is possible to filter columns before and after WITH FILL, but such change is not idempotent.
|
||||
/// So, appliying this to pair (Filter -> Filling) several times will create several similar filters.
|
||||
// if (auto * filling = typeid_cast<FillingStep *>(child.get()))
|
||||
// {
|
||||
// }
|
||||
|
||||
/// Same reason for Cube
|
||||
// if (auto * cube = typeid_cast<CubeStep *>(child.get()))
|
||||
// {
|
||||
// }
|
||||
|
||||
if (typeid_cast<PartialSortingStep *>(child.get())
|
||||
|| typeid_cast<MergeSortingStep *>(child.get())
|
||||
|| typeid_cast<MergingSortedStep *>(child.get())
|
||||
|| typeid_cast<FinishSortingStep *>(child.get()))
|
||||
{
|
||||
Names allowed_inputs = child->getOutputStream().header.getNames();
|
||||
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
|
||||
return updated_steps;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
@ -1,10 +1,20 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <stack>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS;
|
||||
}
|
||||
|
||||
namespace QueryPlanOptimizations
|
||||
{
|
||||
|
||||
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
const auto & optimizations = getOptimizations();
|
||||
|
||||
@ -23,6 +33,9 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
std::stack<Frame> stack;
|
||||
stack.push(Frame{.node = &root});
|
||||
|
||||
size_t max_optimizations_to_apply = settings.max_optimizations_to_apply;
|
||||
size_t total_applied_optimizations = 0;
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & frame = stack.top();
|
||||
@ -54,8 +67,15 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
if (!optimization.apply)
|
||||
continue;
|
||||
|
||||
if (max_optimizations_to_apply && max_optimizations_to_apply < total_applied_optimizations)
|
||||
throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS,
|
||||
"Too many optimizations applied to query plan. Current limit {}",
|
||||
max_optimizations_to_apply);
|
||||
|
||||
/// Try to apply optimization.
|
||||
auto update_depth = optimization.apply(frame.node, nodes);
|
||||
if (update_depth)
|
||||
++total_applied_optimizations;
|
||||
max_update_depth = std::max<size_t>(max_update_depth, update_depth);
|
||||
}
|
||||
|
||||
@ -73,3 +93,4 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -130,10 +130,10 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
|
||||
" input expected", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
QueryPipelinePtr QueryPlan::buildQueryPipeline()
|
||||
QueryPipelinePtr QueryPlan::buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings)
|
||||
{
|
||||
checkInitialized();
|
||||
optimize();
|
||||
optimize(optimization_settings);
|
||||
|
||||
struct Frame
|
||||
{
|
||||
@ -177,7 +177,7 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline()
|
||||
return last_pipeline;
|
||||
}
|
||||
|
||||
Pipe QueryPlan::convertToPipe()
|
||||
Pipe QueryPlan::convertToPipe(const QueryPlanOptimizationSettings & optimization_settings)
|
||||
{
|
||||
if (!isInitialized())
|
||||
return {};
|
||||
@ -185,7 +185,7 @@ Pipe QueryPlan::convertToPipe()
|
||||
if (isCompleted())
|
||||
throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return QueryPipeline::getPipe(std::move(*buildQueryPipeline()));
|
||||
return QueryPipeline::getPipe(std::move(*buildQueryPipeline(optimization_settings)));
|
||||
}
|
||||
|
||||
void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
|
||||
@ -333,9 +333,9 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
|
||||
}
|
||||
}
|
||||
|
||||
void QueryPlan::optimize()
|
||||
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
|
||||
{
|
||||
QueryPlanOptimizations::optimizeTree(*root, nodes);
|
||||
QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <set>
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -27,7 +28,7 @@ class Pipe;
|
||||
|
||||
/// A tree of query steps.
|
||||
/// The goal of QueryPlan is to build QueryPipeline.
|
||||
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
|
||||
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimizations.
|
||||
class QueryPlan
|
||||
{
|
||||
public:
|
||||
@ -43,12 +44,12 @@ public:
|
||||
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
|
||||
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
|
||||
|
||||
void optimize();
|
||||
void optimize(const QueryPlanOptimizationSettings & optimization_settings);
|
||||
|
||||
QueryPipelinePtr buildQueryPipeline();
|
||||
QueryPipelinePtr buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings);
|
||||
|
||||
/// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.
|
||||
Pipe convertToPipe();
|
||||
Pipe convertToPipe(const QueryPlanOptimizationSettings & optimization_settings);
|
||||
|
||||
struct ExplainPlanOptions
|
||||
{
|
||||
|
@ -28,6 +28,8 @@ public:
|
||||
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
const ActionsDAGPtr & getActions() const { return actions_dag; }
|
||||
|
||||
private:
|
||||
bool overflow_row;
|
||||
ActionsDAGPtr actions_dag;
|
||||
|
@ -108,12 +108,13 @@ SRCS(
|
||||
QueryPlan/ITransformingStep.cpp
|
||||
QueryPlan/LimitByStep.cpp
|
||||
QueryPlan/LimitStep.cpp
|
||||
QueryPlan/MaterializingStep.cpp
|
||||
QueryPlan/MergeSortingStep.cpp
|
||||
QueryPlan/MergingAggregatedStep.cpp
|
||||
QueryPlan/MergingFinal.cpp
|
||||
QueryPlan/MergingSortedStep.cpp
|
||||
QueryPlan/OffsetStep.cpp
|
||||
QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp
|
||||
QueryPlan/Optimizations/filterPushDown.cpp
|
||||
QueryPlan/Optimizations/liftUpArrayJoin.cpp
|
||||
QueryPlan/Optimizations/limitPushDown.cpp
|
||||
QueryPlan/Optimizations/mergeExpressions.cpp
|
||||
|
@ -33,7 +33,7 @@ public:
|
||||
std::move(*MergeTreeDataSelectExecutor(part->storage)
|
||||
.readFromParts({part}, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams));
|
||||
|
||||
return query_plan.convertToPipe();
|
||||
return query_plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
|
||||
|
@ -166,7 +166,7 @@ Pipe StorageBuffer::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
void StorageBuffer::read(
|
||||
|
@ -501,7 +501,7 @@ Pipe StorageDistributed::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
void StorageDistributed::read(
|
||||
|
@ -127,7 +127,7 @@ Pipe StorageMaterializedView::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
void StorageMaterializedView::read(
|
||||
|
@ -198,7 +198,7 @@ Pipe StorageMergeTree::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
|
||||
|
@ -3821,7 +3821,7 @@ Pipe StorageReplicatedMergeTree::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/QueryPlan/MaterializingStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
|
||||
|
||||
@ -60,7 +59,7 @@ Pipe StorageView::read(
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe();
|
||||
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
|
||||
}
|
||||
|
||||
void StorageView::read(
|
||||
@ -87,7 +86,10 @@ void StorageView::read(
|
||||
|
||||
/// It's expected that the columns read from storage are not constant.
|
||||
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
|
||||
auto materializing = std::make_unique<MaterializingStep>(query_plan.getCurrentDataStream());
|
||||
auto materializing_actions = std::make_shared<ActionsDAG>(query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
|
||||
materializing_actions->addMaterializingOutputActions();
|
||||
|
||||
auto materializing = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(materializing_actions));
|
||||
materializing->setStepDescription("Materialize constants after VIEW subquery");
|
||||
query_plan.addStep(std::move(materializing));
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
1 1
|
||||
|
||||
0 2
|
||||
0 1
|
||||
-
|
||||
test1 10 0
|
||||
|
||||
|
@ -1,7 +1,9 @@
|
||||
sipHash should be calculated after filtration
|
||||
Too many optimizations applied to query plan
|
||||
Too many optimizations applied to query plan
|
||||
> sipHash should be calculated after filtration
|
||||
FUNCTION sipHash64
|
||||
Filter column: equals
|
||||
sorting steps should know about limit
|
||||
> sorting steps should know about limit
|
||||
Limit 10
|
||||
MergingSorted
|
||||
Limit 10
|
||||
@ -9,3 +11,115 @@ MergeSorting
|
||||
Limit 10
|
||||
PartialSorting
|
||||
Limit 10
|
||||
-- filter push down --
|
||||
> filter should be pushed down after aggregating
|
||||
Aggregating
|
||||
Filter
|
||||
0 1
|
||||
1 2
|
||||
2 3
|
||||
3 4
|
||||
4 5
|
||||
5 6
|
||||
6 7
|
||||
7 8
|
||||
8 9
|
||||
9 10
|
||||
> filter should be pushed down after aggregating, column after aggregation is const
|
||||
COLUMN Const(UInt8) -> notEquals(y, 0)
|
||||
Aggregating
|
||||
Filter
|
||||
Filter
|
||||
0 1 1
|
||||
1 2 1
|
||||
2 3 1
|
||||
3 4 1
|
||||
4 5 1
|
||||
5 6 1
|
||||
6 7 1
|
||||
7 8 1
|
||||
8 9 1
|
||||
9 10 1
|
||||
> one condition of filter should be pushed down after aggregating, other condition is aliased
|
||||
Filter column
|
||||
ALIAS notEquals(s, 4) :: 1 -> and(notEquals(y, 0), notEquals(s, 4))
|
||||
Aggregating
|
||||
Filter column: notEquals(y, 0)
|
||||
0 1
|
||||
1 2
|
||||
2 3
|
||||
3 4
|
||||
5 6
|
||||
6 7
|
||||
7 8
|
||||
8 9
|
||||
9 10
|
||||
> one condition of filter should be pushed down after aggregating, other condition is casted
|
||||
Filter column
|
||||
FUNCTION CAST(minus(s, 4) :: 1, UInt8 :: 3) -> and(notEquals(y, 0), minus(s, 4))
|
||||
Aggregating
|
||||
Filter column: notEquals(y, 0)
|
||||
0 1
|
||||
1 2
|
||||
2 3
|
||||
3 4
|
||||
5 6
|
||||
6 7
|
||||
7 8
|
||||
8 9
|
||||
9 10
|
||||
> one condition of filter should be pushed down after aggregating, other two conditions are ANDed
|
||||
Filter column
|
||||
FUNCTION and(minus(s, 8) :: 1, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))
|
||||
Aggregating
|
||||
Filter column: notEquals(y, 0)
|
||||
0 1
|
||||
1 2
|
||||
2 3
|
||||
3 4
|
||||
5 6
|
||||
6 7
|
||||
7 8
|
||||
9 10
|
||||
> two conditions of filter should be pushed down after aggregating and ANDed, one condition is aliased
|
||||
Filter column
|
||||
ALIAS notEquals(s, 8) :: 1 -> and(notEquals(y, 0), notEquals(s, 8), minus(y, 4))
|
||||
Aggregating
|
||||
Filter column: and(notEquals(y, 0), minus(y, 4))
|
||||
0 1
|
||||
1 2
|
||||
2 3
|
||||
4 5
|
||||
5 6
|
||||
6 7
|
||||
7 8
|
||||
9 10
|
||||
> filter is split, one part is filtered before ARRAY JOIN
|
||||
Filter column: and(notEquals(y, 2), notEquals(x, 0))
|
||||
ARRAY JOIN x
|
||||
Filter column: notEquals(y, 2)
|
||||
1 3
|
||||
> filter is pushed down before Distinct
|
||||
Distinct
|
||||
Distinct
|
||||
Filter column: notEquals(y, 2)
|
||||
0 0
|
||||
0 1
|
||||
1 0
|
||||
1 1
|
||||
> filter is pushed down before sorting steps
|
||||
MergingSorted
|
||||
MergeSorting
|
||||
PartialSorting
|
||||
Filter column: and(notEquals(x, 0), notEquals(y, 0))
|
||||
1 2
|
||||
1 1
|
||||
> filter is pushed down before TOTALS HAVING and aggregating
|
||||
TotalsHaving
|
||||
Aggregating
|
||||
Filter column: notEquals(y, 2)
|
||||
0 12
|
||||
1 15
|
||||
3 10
|
||||
|
||||
0 37
|
||||
|
@ -4,7 +4,149 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
echo "sipHash should be calculated after filtration"
|
||||
$CLICKHOUSE_CLIENT -q "select x + 1 from (select y + 2 as x from (select dummy + 3 as y)) settings query_plan_max_optimizations_to_apply = 1" 2>&1 |
|
||||
grep -o "Too many optimizations applied to query plan"
|
||||
|
||||
echo "> sipHash should be calculated after filtration"
|
||||
$CLICKHOUSE_CLIENT -q "explain actions = 1 select sum(x), sum(y) from (select sipHash64(number) as x, bitAnd(number, 1024) as y from numbers_mt(1000000000) limit 1000000000) where y = 0" | grep -o "FUNCTION sipHash64\|Filter column: equals"
|
||||
echo "sorting steps should know about limit"
|
||||
echo "> sorting steps should know about limit"
|
||||
$CLICKHOUSE_CLIENT -q "explain actions = 1 select number from (select number from numbers(500000000) order by -number) limit 10" | grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Limit 10"
|
||||
|
||||
echo "-- filter push down --"
|
||||
echo "> filter should be pushed down after aggregating"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain select * from (select sum(x), y from (
|
||||
select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0
|
||||
settings enable_optimize_predicate_expression=0" | grep -o "Aggregating\|Filter"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y from (select sum(x) as s, y from (
|
||||
select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 order by s, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> filter should be pushed down after aggregating, column after aggregation is const"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select s, y, y != 0 from (select sum(x) as s, y from (
|
||||
select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0
|
||||
settings enable_optimize_predicate_expression=0" | grep -o "Aggregating\|Filter\|COLUMN Const(UInt8) -> notEquals(y, 0)"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y, y != 0 from (select sum(x) as s, y from (
|
||||
select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 order by s, y, y != 0
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> one condition of filter should be pushed down after aggregating, other condition is aliased"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s != 4
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|ALIAS notEquals(s, 4) :: 1 -> and(notEquals(y, 0), notEquals(s, 4))"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s != 4 order by s, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> one condition of filter should be pushed down after aggregating, other condition is casted"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s - 4
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION CAST(minus(s, 4) :: 1, UInt8 :: 3) -> and(notEquals(y, 0), minus(s, 4))"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s - 4 order by s, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> one condition of filter should be pushed down after aggregating, other two conditions are ANDed"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s - 8 and s - 4
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION and(minus(s, 8) :: 1, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s - 8 and s - 4 order by s, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> two conditions of filter should be pushed down after aggregating and ANDed, one condition is aliased"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s != 8 and y - 4
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "Aggregating\|Filter column\|Filter column: and(notEquals(y, 0), minus(y, 4))\|ALIAS notEquals(s, 8) :: 1 -> and(notEquals(y, 0), notEquals(s, 8), minus(y, 4))"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select s, y from (
|
||||
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
|
||||
) where y != 0 and s != 8 and y - 4 order by s, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> filter is split, one part is filtered before ARRAY JOIN"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select x, y from (
|
||||
select range(number) as x, number + 1 as y from numbers(3)
|
||||
) array join x where y != 2 and x != 0" |
|
||||
grep -o "Filter column: and(notEquals(y, 2), notEquals(x, 0))\|ARRAY JOIN x\|Filter column: notEquals(y, 2)"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select x, y from (
|
||||
select range(number) as x, number + 1 as y from numbers(3)
|
||||
) array join x where y != 2 and x != 0 order by x, y"
|
||||
|
||||
# echo "> filter is split, one part is filtered before Aggregating and Cube"
|
||||
# $CLICKHOUSE_CLIENT -q "
|
||||
# explain actions = 1 select * from (
|
||||
# select sum(x) as s, x, y from (select number as x, number + 1 as y from numbers(10)) group by x, y with cube
|
||||
# ) where y != 0 and s != 4
|
||||
# settings enable_optimize_predicate_expression=0" |
|
||||
# grep -o "Cube\|Aggregating\|Filter column: notEquals(y, 0)"
|
||||
# $CLICKHOUSE_CLIENT -q "
|
||||
# select s, x, y from (
|
||||
# select sum(x) as s, x, y from (select number as x, number + 1 as y from numbers(10)) group by x, y with cube
|
||||
# ) where y != 0 and s != 4 order by s, x, y
|
||||
# settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> filter is pushed down before Distinct"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select x, y from (
|
||||
select distinct x, y from (select number % 2 as x, number % 3 as y from numbers(10))
|
||||
) where y != 2
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "Distinct\|Filter column: notEquals(y, 2)"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select x, y from (
|
||||
select distinct x, y from (select number % 2 as x, number % 3 as y from numbers(10))
|
||||
) where y != 2 order by x, y
|
||||
settings enable_optimize_predicate_expression=0"
|
||||
|
||||
echo "> filter is pushed down before sorting steps"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select x, y from (
|
||||
select number % 2 as x, number % 3 as y from numbers(6) order by y desc
|
||||
) where x != 0 and y != 0
|
||||
settings enable_optimize_predicate_expression = 0" |
|
||||
grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Filter column: and(notEquals(x, 0), notEquals(y, 0))"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select x, y from (
|
||||
select number % 2 as x, number % 3 as y from numbers(6) order by y desc
|
||||
) where x != 0 and y != 0
|
||||
settings enable_optimize_predicate_expression = 0"
|
||||
|
||||
echo "> filter is pushed down before TOTALS HAVING and aggregating"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
explain actions = 1 select * from (
|
||||
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
|
||||
) where y != 2
|
||||
settings enable_optimize_predicate_expression=0" |
|
||||
grep -o "TotalsHaving\|Aggregating\|Filter column: notEquals(y, 2)"
|
||||
$CLICKHOUSE_CLIENT -q "
|
||||
select * from (
|
||||
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
|
||||
) where y != 2"
|
||||
|
Loading…
Reference in New Issue
Block a user