Merge branch 'master' into nav-fixes

This commit is contained in:
Rich Raposa 2023-03-23 14:22:48 -06:00 committed by GitHub
commit 7f2076c56e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1693 additions and 656 deletions

View File

@ -338,6 +338,12 @@ UserID.binURL.bin和EventTime.bin是<font face = "monospace">UserID</font>
:::note
- 最后一个颗粒1082颗粒是少于8192行的。
- 我们在本指南开头的“DDL 语句详细信息”中提到,我们禁用了自适应索引粒度(为了简化本指南中的讨论,并使图表和结果可重现)。
因此,示例表中所有颗粒(除了最后一个)都具有相同大小。
- 对于具有自适应索引粒度的表(默认情况下索引粒度是自适应的),某些粒度的大小可以小于 8192 行,具体取决于行数据大小。
- 我们将主键列(<font face = "monospace">UserID</font>, <font face = "monospace">URL</font>)中的一些列值标记为橙色。
这些橙色标记的列值是每个颗粒中每个主键列的最小值。这里的例外是最后一个颗粒(上图中的颗粒1082),最后一个颗粒我们标记的是最大的值。

View File

@ -111,7 +111,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int ALIAS_REQUIRED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_PREWHERE;
extern const int UNKNOWN_TABLE;
}
@ -6856,13 +6855,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.isGroupByAll())
expandGroupByAll(query_node_typed);
if (query_node_typed.hasPrewhere())
assertNoFunctionNodes(query_node_typed.getPrewhere(),
"arrayJoin",
ErrorCodes::ILLEGAL_PREWHERE,
"ARRAY JOIN",
"in PREWHERE");
validateFilters(query_node);
validateAggregates(query_node, { .group_by_use_nulls = scope.group_by_use_nulls });
for (const auto & column : projection_columns)

View File

@ -17,8 +17,50 @@ namespace ErrorCodes
extern const int NOT_AN_AGGREGATE;
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int ILLEGAL_PREWHERE;
}
namespace
{
void validateFilter(const QueryTreeNodePtr & filter_node, std::string_view exception_place_message, const QueryTreeNodePtr & query_node)
{
auto filter_node_result_type = filter_node->getResultType();
if (!filter_node_result_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Invalid type for filter in {}: {}. In query {}",
exception_place_message,
filter_node_result_type->getName(),
query_node->formatASTForErrorMessage());
}
}
void validateFilters(const QueryTreeNodePtr & query_node)
{
const auto & query_node_typed = query_node->as<QueryNode &>();
if (query_node_typed.hasPrewhere())
{
validateFilter(query_node_typed.getPrewhere(), "PREWHERE", query_node);
assertNoFunctionNodes(query_node_typed.getPrewhere(),
"arrayJoin",
ErrorCodes::ILLEGAL_PREWHERE,
"ARRAY JOIN",
"in PREWHERE");
}
if (query_node_typed.hasWhere())
validateFilter(query_node_typed.getWhere(), "WHERE", query_node);
if (query_node_typed.hasHaving())
validateFilter(query_node_typed.getHaving(), "HAVING", query_node);
}
namespace
{
class ValidateGroupByColumnsVisitor : public ConstInDepthQueryTreeVisitor<ValidateGroupByColumnsVisitor>
{
public:
@ -106,7 +148,9 @@ private:
const QueryTreeNodePtr & query_node;
};
void validateAggregates(const QueryTreeNodePtr & query_node, ValidationParams params)
}
void validateAggregates(const QueryTreeNodePtr & query_node, AggregatesValidationParams params)
{
const auto & query_node_typed = query_node->as<QueryNode &>();
auto join_tree_node_type = query_node_typed.getJoinTree()->getNodeType();

View File

@ -5,7 +5,10 @@
namespace DB
{
struct ValidationParams
/// Validate PREWHERE, WHERE, HAVING in query node
void validateFilters(const QueryTreeNodePtr & query_node);
struct AggregatesValidationParams
{
bool group_by_use_nulls = false;
};
@ -20,7 +23,7 @@ struct ValidationParams
* PROJECTION.
* 5. Throws exception if there is GROUPING SETS or ROLLUP or CUBE or WITH TOTALS without aggregation.
*/
void validateAggregates(const QueryTreeNodePtr & query_node, ValidationParams params);
void validateAggregates(const QueryTreeNodePtr & query_node, AggregatesValidationParams params);
/** Assert that there are no function nodes with specified function name in node children.
* Do not visit subqueries.

View File

@ -9,6 +9,7 @@
#include <Functions/materialize.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/indexHint.h>
#include <Interpreters/Context.h>
#include <Interpreters/ArrayJoinAction.h>
#include <IO/WriteBufferFromString.h>
@ -1364,6 +1365,83 @@ void ActionsDAG::mergeInplace(ActionsDAG && second)
first.projected_output = second.projected_output;
}
void ActionsDAG::mergeNodes(ActionsDAG && second)
{
std::unordered_map<std::string, const ActionsDAG::Node *> node_name_to_node;
for (auto & node : nodes)
node_name_to_node.emplace(node.result_name, &node);
struct Frame
{
ActionsDAG::Node * node = nullptr;
bool visited_children = false;
};
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> const_node_to_node;
for (auto & node : second.nodes)
const_node_to_node.emplace(&node, &node);
std::vector<Frame> nodes_to_process;
nodes_to_process.reserve(second.getOutputs().size());
for (auto & node : second.getOutputs())
nodes_to_process.push_back({const_node_to_node.at(node), false /*visited_children*/});
std::unordered_set<const ActionsDAG::Node *> nodes_to_move_from_second_dag;
while (!nodes_to_process.empty())
{
auto & node_to_process = nodes_to_process.back();
auto * node = node_to_process.node;
auto node_it = node_name_to_node.find(node->result_name);
if (node_it != node_name_to_node.end())
{
nodes_to_process.pop_back();
continue;
}
if (!node_to_process.visited_children)
{
node_to_process.visited_children = true;
for (auto & child : node->children)
nodes_to_process.push_back({const_node_to_node.at(child), false /*visited_children*/});
/// If node has children process them first
if (!node->children.empty())
continue;
}
for (auto & child : node->children)
child = node_name_to_node.at(child->result_name);
node_name_to_node.emplace(node->result_name, node);
nodes_to_move_from_second_dag.insert(node);
nodes_to_process.pop_back();
}
if (nodes_to_move_from_second_dag.empty())
return;
auto second_nodes_end = second.nodes.end();
for (auto second_node_it = second.nodes.begin(); second_node_it != second_nodes_end;)
{
if (!nodes_to_move_from_second_dag.contains(&(*second_node_it)))
{
++second_node_it;
continue;
}
auto node_to_move_it = second_node_it;
++second_node_it;
nodes.splice(nodes.end(), second.nodes, node_to_move_it);
if (node_to_move_it->type == ActionType::INPUT)
inputs.push_back(&(*node_to_move_it));
}
}
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
{
/// Split DAG into two parts.
@ -2193,7 +2271,8 @@ bool ActionsDAG::isSortingPreserved(
ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
const NodeRawConstPtrs & filter_nodes,
const std::unordered_map<std::string, ColumnWithTypeAndName> & node_name_to_input_node_column,
const ContextPtr & context)
const ContextPtr & context,
bool single_output_condition_node)
{
if (filter_nodes.empty())
return nullptr;
@ -2281,13 +2360,35 @@ ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
NodeRawConstPtrs function_children;
function_children.reserve(node->children.size());
FunctionOverloadResolverPtr function_overload_resolver;
if (node->function_base->getName() == "indexHint")
{
ActionsDAG::NodeRawConstPtrs children;
if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node->function_base.get()))
{
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
{
auto index_hint_filter_dag = buildFilterActionsDAG(index_hint->getActions()->getOutputs(),
node_name_to_input_node_column,
context,
false /*single_output_condition_node*/);
auto index_hint_function_clone = std::make_shared<FunctionIndexHint>();
index_hint_function_clone->setActions(std::move(index_hint_filter_dag));
function_overload_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(index_hint_function_clone));
}
}
}
for (const auto & child : node->children)
function_children.push_back(node_to_result_node.find(child)->second);
auto [arguments, all_const] = getFunctionArguments(function_children);
auto function_base = function_overload_resolver ? function_overload_resolver->build(arguments) : node->function_base;
result_node = &result_dag->addFunctionImpl(
node->function_base,
function_base,
std::move(function_children),
std::move(arguments),
{},
@ -2307,7 +2408,7 @@ ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
for (const auto & node : filter_nodes)
result_dag_outputs.push_back(node_to_result_node.find(node)->second);
if (result_dag_outputs.size() > 1)
if (result_dag_outputs.size() > 1 && single_output_condition_node)
{
auto function_builder = FunctionFactory::instance().get("and", context);
result_dag_outputs = { &result_dag->addFunction(function_builder, result_dag_outputs, {}) };

View File

@ -290,6 +290,9 @@ public:
/// So that pointers to nodes are kept valid.
void mergeInplace(ActionsDAG && second);
/// Merge current nodes with specified dag nodes
void mergeNodes(ActionsDAG && second);
using SplitResult = std::pair<ActionsDAGPtr, ActionsDAGPtr>;
/// Split ActionsDAG into two DAGs, where first part contains all nodes from split_nodes and their children.
@ -344,15 +347,18 @@ public:
* Additionally during dag construction if node has name that exists in node_name_to_input_column map argument
* in final dag this node is represented as INPUT node with specified column.
*
* Result dag has only single output node:
* If single_output_condition_node = true, result dag has single output node:
* 1. If there is single filter node, result dag output will contain this node.
* 2. If there are multiple filter nodes, result dag output will contain single `and` function node
* and children of this node will be filter nodes.
*
* If single_output_condition_node = false, result dag has multiple output nodes.
*/
static ActionsDAGPtr buildFilterActionsDAG(
const NodeRawConstPtrs & filter_nodes,
const std::unordered_map<std::string, ColumnWithTypeAndName> & node_name_to_input_node_column,
const ContextPtr & context);
const ContextPtr & context,
bool single_output_condition_node = true);
private:
NodeRawConstPtrs getParents(const Node * target) const;

View File

@ -636,14 +636,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
Names queried_columns = syntax_analyzer_result->requiredSourceColumns();
const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
MergeTreeWhereOptimizer{
current_info,
context,
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
metadata_snapshot,
queried_columns,
supported_prewhere_columns,
log};
where_optimizer.optimize(current_info, context);
}
}

View File

@ -10,6 +10,7 @@
#include <Analyzer/TableFunctionNode.h>
#include <Planner/PlannerContext.h>
#include <Planner/PlannerActionsVisitor.h>
namespace DB
{
@ -17,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_PREWHERE;
}
namespace
@ -78,23 +80,128 @@ public:
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
{
return !(child_node->getNodeType() == QueryTreeNodeType::QUERY || child_node->getNodeType() == QueryTreeNodeType::UNION);
auto child_node_type = child_node->getNodeType();
return !(child_node_type == QueryTreeNodeType::QUERY || child_node_type == QueryTreeNodeType::UNION);
}
private:
PlannerContext & planner_context;
};
class CollectPrewhereTableExpressionVisitor : public ConstInDepthQueryTreeVisitor<CollectPrewhereTableExpressionVisitor>
{
public:
explicit CollectPrewhereTableExpressionVisitor(const QueryTreeNodePtr & query_node_)
: query_node(query_node_)
{}
const QueryTreeNodePtr & getPrewhereTableExpression() const
{
return table_expression;
}
void visitImpl(const QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;
auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Invalid column {} in PREWHERE. In query {}",
column_node->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());
auto * table_column_source = column_source->as<TableNode>();
auto * table_function_column_source = column_source->as<TableFunctionNode>();
if (!table_column_source && !table_function_column_source)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Invalid column {} in PREWHERE. Expected column source to be table or table function. Actual {}. In query {}",
column_node->formatASTForErrorMessage(),
column_source->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());
if (table_expression && table_expression.get() != column_source.get())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Invalid column {} in PREWHERE. Expected columns from single table or table function {}. Actual {}. In query {}",
column_node->formatASTForErrorMessage(),
table_expression->formatASTForErrorMessage(),
column_source->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());
if (!table_expression)
{
const auto & storage = table_column_source ? table_column_source->getStorage() : table_function_column_source->getStorage();
if (!storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Storage {} (table {}) does not support PREWHERE",
storage->getName(),
storage->getStorageID().getNameForLogs());
table_expression = std::move(column_source);
table_supported_prewhere_columns = storage->supportedPrewhereColumns();
}
if (table_supported_prewhere_columns && !table_supported_prewhere_columns->contains(column_node->getColumnName()))
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Table expression {} does not support column {} in PREWHERE. In query {}",
table_expression->formatASTForErrorMessage(),
column_node->formatASTForErrorMessage(),
query_node->formatASTForErrorMessage());
}
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
{
auto child_node_type = child_node->getNodeType();
return !(child_node_type == QueryTreeNodeType::QUERY || child_node_type == QueryTreeNodeType::UNION);
}
private:
QueryTreeNodePtr query_node;
QueryTreeNodePtr table_expression;
std::optional<NameSet> table_supported_prewhere_columns;
};
void checkStorageSupportPrewhere(const QueryTreeNodePtr & table_expression)
{
if (auto * table_node = table_expression->as<TableNode>())
{
auto storage = table_node->getStorage();
if (!storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Storage {} (table {}) does not support PREWHERE",
storage->getName(),
storage->getStorageID().getNameForLogs());
}
else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
{
auto storage = table_function_node->getStorage();
if (!storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Table function storage {} (table {}) does not support PREWHERE",
storage->getName(),
storage->getStorageID().getNameForLogs());
}
else
{
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Subquery {} does not support PREWHERE",
table_expression->formatASTForErrorMessage());
}
}
void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContext & planner_context)
}
void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContextPtr & planner_context)
{
auto & query_node_typed = query_node->as<QueryNode &>();
auto table_expressions_nodes = extractTableExpressions(query_node_typed.getJoinTree());
for (auto & table_expression_node : table_expressions_nodes)
{
auto & table_expression_data = planner_context.getOrCreateTableExpressionData(table_expression_node);
auto & table_expression_data = planner_context->getOrCreateTableExpressionData(table_expression_node);
if (auto * table_node = table_expression_node->as<TableNode>())
{
@ -108,8 +215,60 @@ void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContext &
}
}
CollectSourceColumnsVisitor collect_source_columns_visitor(planner_context);
collect_source_columns_visitor.visit(query_node);
CollectSourceColumnsVisitor collect_source_columns_visitor(*planner_context);
for (auto & node : query_node_typed.getChildren())
{
if (!node || node == query_node_typed.getPrewhere())
continue;
auto node_type = node->getNodeType();
if (node_type == QueryTreeNodeType::QUERY || node_type == QueryTreeNodeType::UNION)
continue;
collect_source_columns_visitor.visit(node);
}
if (query_node_typed.hasPrewhere())
{
CollectPrewhereTableExpressionVisitor collect_prewhere_table_expression_visitor(query_node);
collect_prewhere_table_expression_visitor.visit(query_node_typed.getPrewhere());
auto prewhere_table_expression = collect_prewhere_table_expression_visitor.getPrewhereTableExpression();
if (!prewhere_table_expression)
{
prewhere_table_expression = table_expressions_nodes[0];
checkStorageSupportPrewhere(prewhere_table_expression);
}
auto & table_expression_data = planner_context->getOrCreateTableExpressionData(prewhere_table_expression);
const auto & column_names = table_expression_data.getColumnNames();
NameSet required_column_names_without_prewhere(column_names.begin(), column_names.end());
collect_source_columns_visitor.visit(query_node_typed.getPrewhere());
auto prewhere_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = visitor.visit(prewhere_actions_dag, query_node_typed.getPrewhere());
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Invalid PREWHERE. Expected single boolean expression. In query {}",
query_node->formatASTForErrorMessage());
prewhere_actions_dag->getOutputs().push_back(expression_nodes[0]);
for (const auto & prewhere_input_node : prewhere_actions_dag->getInputs())
if (required_column_names_without_prewhere.contains(prewhere_input_node->result_name))
prewhere_actions_dag->getOutputs().push_back(prewhere_input_node);
table_expression_data.setPrewhereFilterActions(std::move(prewhere_actions_dag));
}
}
void collectSourceColumns(QueryTreeNodePtr & expression_node, PlannerContextPtr & planner_context)
{
CollectSourceColumnsVisitor collect_source_columns_visitor(*planner_context);
collect_source_columns_visitor.visit(expression_node);
}
}

View File

@ -12,6 +12,13 @@ namespace DB
*
* ALIAS table column nodes are registered in table expression data and replaced in query tree with inner alias expression.
*/
void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContext & planner_context);
void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContextPtr & planner_context);
/** Collect source columns for expression node.
* Collected source columns are registered in planner context.
*
* ALIAS table column nodes are registered in table expression data and replaced in query tree with inner alias expression.
*/
void collectSourceColumns(QueryTreeNodePtr & expression_node, PlannerContextPtr & planner_context);
}

View File

@ -79,26 +79,14 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int TOO_DEEP_SUBQUERIES;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_PREWHERE;
}
/** ClickHouse query planner.
*
* TODO: Support JOIN with JOIN engine.
* TODO: Support VIEWs.
* TODO: JOIN drop unnecessary columns after ON, USING section
* TODO: Support RBAC. Support RBAC for ALIAS columns
* TODO: Support PREWHERE
* TODO: Support DISTINCT
* TODO: Support trivial count optimization
* TODO: Support projections
* TODO: Support read in order optimization
* TODO: UNION storage limits
* TODO: Support max streams
* TODO: Support ORDER BY read in order optimization
* TODO: Support GROUP BY read in order optimization
* TODO: Support Key Condition. Support indexes for IN function.
* TODO: Better support for quota and limits.
* TODO: Support projections.
* TODO: Support trivial count using partition predicates.
* TODO: Support trivial count for table functions.
* TODO: Support indexes for IN function.
*/
namespace
@ -135,37 +123,6 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
}
}
void checkStorageSupportPrewhere(const QueryTreeNodePtr & query_node)
{
auto & query_node_typed = query_node->as<QueryNode &>();
auto table_expression = extractLeftTableExpression(query_node_typed.getJoinTree());
if (auto * table_node = table_expression->as<TableNode>())
{
auto storage = table_node->getStorage();
if (!storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Storage {} (table {}) does not support PREWHERE",
storage->getName(),
storage->getStorageID().getNameForLogs());
}
else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
{
auto storage = table_function_node->getStorage();
if (!storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Table function storage {} (table {}) does not support PREWHERE",
storage->getName(),
storage->getStorageID().getNameForLogs());
}
else
{
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Subquery {} does not support PREWHERE",
query_node->formatASTForErrorMessage());
}
}
/// Extend lifetime of query context, storages, and table locks
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
{
@ -1140,18 +1097,6 @@ void Planner::buildPlanForQueryNode()
auto & query_node = query_tree->as<QueryNode &>();
const auto & query_context = planner_context->getQueryContext();
if (query_node.hasPrewhere())
{
checkStorageSupportPrewhere(query_tree);
if (query_node.hasWhere())
query_node.getWhere() = mergeConditionNodes({query_node.getPrewhere(), query_node.getWhere()}, query_context);
else
query_node.getWhere() = query_node.getPrewhere();
query_node.getPrewhere() = {};
}
if (query_node.hasWhere())
{
auto condition_constant = tryExtractConstantFromConditionNode(query_node.getWhere());
@ -1185,8 +1130,8 @@ void Planner::buildPlanForQueryNode()
}
checkStoragesSupportTransactions(planner_context);
collectTableExpressionData(query_tree, *planner_context);
collectSets(query_tree, *planner_context);
collectTableExpressionData(query_tree, planner_context);
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
@ -1215,6 +1160,12 @@ void Planner::buildPlanForQueryNode()
std::vector<ActionsDAGPtr> result_actions_to_execute;
for (auto & [_, table_expression_data] : planner_context->getTableExpressionNodeToData())
{
if (table_expression_data.getPrewhereFilterActions())
result_actions_to_execute.push_back(table_expression_data.getPrewhereFilterActions());
}
if (query_processing_info.isIntermediateStage())
{
addPreliminarySortOrDistinctOrLimitStepsIfNeeded(query_plan,

View File

@ -44,6 +44,264 @@ namespace ErrorCodes
namespace
{
class ActionNodeNameHelper
{
public:
ActionNodeNameHelper(QueryTreeNodeToName & node_to_name_,
const PlannerContext & planner_context_,
bool use_column_identifier_as_action_node_name_)
: node_to_name(node_to_name_)
, planner_context(planner_context_)
, use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_)
{
}
String calculateActionNodeName(const QueryTreeNodePtr & node)
{
auto it = node_to_name.find(node);
if (it != node_to_name.end())
return it->second;
String result;
auto node_type = node->getNodeType();
switch (node_type)
{
case QueryTreeNodeType::COLUMN:
{
const ColumnIdentifier * column_identifier = nullptr;
if (use_column_identifier_as_action_node_name)
column_identifier = planner_context.getColumnNodeIdentifierOrNull(node);
if (column_identifier)
{
result = *column_identifier;
}
else
{
const auto & column_node = node->as<ColumnNode &>();
result = column_node.getColumnName();
}
break;
}
case QueryTreeNodeType::CONSTANT:
{
const auto & constant_node = node->as<ConstantNode &>();
result = calculateConstantActionNodeName(constant_node.getValue(), constant_node.getResultType());
break;
}
case QueryTreeNodeType::FUNCTION:
{
const auto & function_node = node->as<FunctionNode &>();
String in_function_second_argument_node_name;
if (isNameOfInFunction(function_node.getFunctionName()))
{
const auto & in_second_argument_node = function_node.getArguments().getNodes().at(1);
in_function_second_argument_node_name = planner_context.createSetKey(in_second_argument_node);
}
WriteBufferFromOwnString buffer;
buffer << function_node.getFunctionName();
const auto & function_parameters_nodes = function_node.getParameters().getNodes();
if (!function_parameters_nodes.empty())
{
buffer << '(';
size_t function_parameters_nodes_size = function_parameters_nodes.size();
for (size_t i = 0; i < function_parameters_nodes_size; ++i)
{
const auto & function_parameter_node = function_parameters_nodes[i];
buffer << calculateActionNodeName(function_parameter_node);
if (i + 1 != function_parameters_nodes_size)
buffer << ", ";
}
buffer << ')';
}
const auto & function_arguments_nodes = function_node.getArguments().getNodes();
String function_argument_name;
buffer << '(';
size_t function_arguments_nodes_size = function_arguments_nodes.size();
for (size_t i = 0; i < function_arguments_nodes_size; ++i)
{
if (i == 1 && !in_function_second_argument_node_name.empty())
{
function_argument_name = in_function_second_argument_node_name;
}
else
{
const auto & function_argument_node = function_arguments_nodes[i];
function_argument_name = calculateActionNodeName(function_argument_node);
}
buffer << function_argument_name;
if (i + 1 != function_arguments_nodes_size)
buffer << ", ";
}
buffer << ')';
if (function_node.isWindowFunction())
{
buffer << " OVER (";
buffer << calculateWindowNodeActionName(function_node.getWindowNode());
buffer << ')';
}
result = buffer.str();
break;
}
case QueryTreeNodeType::LAMBDA:
{
auto lambda_hash = node->getTreeHash();
result = "__lambda_" + toString(lambda_hash.first) + '_' + toString(lambda_hash.second);
break;
}
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage());
}
}
node_to_name.emplace(node, result);
return result;
}
static String calculateConstantActionNodeName(const Field & constant_literal, const DataTypePtr & constant_type)
{
auto constant_name = applyVisitor(FieldVisitorToString(), constant_literal);
return constant_name + "_" + constant_type->getName();
}
static String calculateConstantActionNodeName(const Field & constant_literal)
{
return calculateConstantActionNodeName(constant_literal, applyVisitor(FieldToDataType(), constant_literal));
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node)
{
auto & window_node = node->as<WindowNode &>();
WriteBufferFromOwnString buffer;
if (window_node.hasPartitionBy())
{
buffer << "PARTITION BY ";
auto & partition_by_nodes = window_node.getPartitionBy().getNodes();
size_t partition_by_nodes_size = partition_by_nodes.size();
for (size_t i = 0; i < partition_by_nodes_size; ++i)
{
auto & partition_by_node = partition_by_nodes[i];
buffer << calculateActionNodeName(partition_by_node);
if (i + 1 != partition_by_nodes_size)
buffer << ", ";
}
}
if (window_node.hasOrderBy())
{
if (window_node.hasPartitionBy())
buffer << ' ';
buffer << "ORDER BY ";
auto & order_by_nodes = window_node.getOrderBy().getNodes();
size_t order_by_nodes_size = order_by_nodes.size();
for (size_t i = 0; i < order_by_nodes_size; ++i)
{
auto & sort_node = order_by_nodes[i]->as<SortNode &>();
buffer << calculateActionNodeName(sort_node.getExpression());
auto sort_direction = sort_node.getSortDirection();
buffer << (sort_direction == SortDirection::ASCENDING ? " ASC" : " DESC");
auto nulls_sort_direction = sort_node.getNullsSortDirection();
if (nulls_sort_direction)
buffer << " NULLS " << (nulls_sort_direction == sort_direction ? "LAST" : "FIRST");
if (auto collator = sort_node.getCollator())
buffer << " COLLATE " << collator->getLocale();
if (sort_node.withFill())
{
buffer << " WITH FILL";
if (sort_node.hasFillFrom())
buffer << " FROM " << calculateActionNodeName(sort_node.getFillFrom());
if (sort_node.hasFillTo())
buffer << " TO " << calculateActionNodeName(sort_node.getFillTo());
if (sort_node.hasFillStep())
buffer << " STEP " << calculateActionNodeName(sort_node.getFillStep());
}
if (i + 1 != order_by_nodes_size)
buffer << ", ";
}
}
auto & window_frame = window_node.getWindowFrame();
if (!window_frame.is_default)
{
if (window_node.hasPartitionBy() || window_node.hasOrderBy())
buffer << ' ';
buffer << window_frame.type << " BETWEEN ";
if (window_frame.begin_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameBeginOffsetNode());
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
buffer << " AND ";
if (window_frame.end_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameEndOffsetNode());
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
}
return buffer.str();
}
private:
std::unordered_map<QueryTreeNodePtr, std::string> & node_to_name;
const PlannerContext & planner_context;
bool use_column_identifier_as_action_node_name = true;
};
class ActionsScopeNode
{
public:
@ -165,7 +423,9 @@ private:
class PlannerActionsVisitorImpl
{
public:
PlannerActionsVisitorImpl(ActionsDAGPtr actions_dag, const PlannerContextPtr & planner_context_);
PlannerActionsVisitorImpl(ActionsDAGPtr actions_dag,
const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_);
ActionsDAG::NodeRawConstPtrs visit(QueryTreeNodePtr expression_node);
@ -189,10 +449,14 @@ private:
std::vector<ActionsScopeNode> actions_stack;
std::unordered_map<QueryTreeNodePtr, std::string> node_to_node_name;
const PlannerContextPtr planner_context;
ActionNodeNameHelper action_node_name_helper;
};
PlannerActionsVisitorImpl::PlannerActionsVisitorImpl(ActionsDAGPtr actions_dag, const PlannerContextPtr & planner_context_)
PlannerActionsVisitorImpl::PlannerActionsVisitorImpl(ActionsDAGPtr actions_dag,
const PlannerContextPtr & planner_context_,
bool use_column_identifier_as_action_node_name_)
: planner_context(planner_context_)
, action_node_name_helper(node_to_node_name, *planner_context, use_column_identifier_as_action_node_name_)
{
actions_stack.emplace_back(std::move(actions_dag), nullptr);
}
@ -236,7 +500,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitColumn(const QueryTreeNodePtr & node)
{
auto column_node_name = calculateActionNodeName(node, *planner_context, node_to_node_name);
auto column_node_name = action_node_name_helper.calculateActionNodeName(node);
const auto & column_node = node->as<ColumnNode &>();
Int64 actions_stack_size = static_cast<Int64>(actions_stack.size() - 1);
@ -386,7 +650,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitIndexHintFunction(const QueryTreeNodePtr & node)
{
const auto & function_node = node->as<FunctionNode &>();
auto function_node_name = calculateActionNodeName(node, *planner_context, node_to_node_name);
auto function_node_name = action_node_name_helper.calculateActionNodeName(node);
auto index_hint_actions_dag = std::make_shared<ActionsDAG>();
auto & index_hint_actions_dag_outputs = index_hint_actions_dag->getOutputs();
@ -428,7 +692,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
if (isNameOfInFunction(function_node.getFunctionName()))
in_function_second_argument_node_name_with_level = makeSetForInFunction(node);
auto function_node_name = calculateActionNodeName(node, *planner_context, node_to_node_name);
auto function_node_name = action_node_name_helper.calculateActionNodeName(node);
/* Aggregate functions, window functions, and GROUP BY expressions were already analyzed in the previous steps.
* If we have already visited some expression, we don't need to revisit it or its arguments again.
@ -516,266 +780,57 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
}
PlannerActionsVisitor::PlannerActionsVisitor(const PlannerContextPtr & planner_context_)
PlannerActionsVisitor::PlannerActionsVisitor(const PlannerContextPtr & planner_context_, bool use_column_identifier_as_action_node_name_)
: planner_context(planner_context_)
, use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_)
{}
ActionsDAG::NodeRawConstPtrs PlannerActionsVisitor::visit(ActionsDAGPtr actions_dag, QueryTreeNodePtr expression_node)
{
PlannerActionsVisitorImpl actions_visitor_impl(actions_dag, planner_context);
PlannerActionsVisitorImpl actions_visitor_impl(actions_dag, planner_context, use_column_identifier_as_action_node_name);
return actions_visitor_impl.visit(expression_node);
}
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name)
String calculateActionNodeName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name)
{
auto it = node_to_name.find(node);
if (it != node_to_name.end())
return it->second;
String result;
auto node_type = node->getNodeType();
switch (node_type)
{
case QueryTreeNodeType::COLUMN:
{
const auto * column_identifier = planner_context.getColumnNodeIdentifierOrNull(node);
if (column_identifier)
{
result = *column_identifier;
}
else
{
const auto & column_node = node->as<ColumnNode &>();
result = column_node.getColumnName();
}
break;
}
case QueryTreeNodeType::CONSTANT:
{
const auto & constant_node = node->as<ConstantNode &>();
result = calculateConstantActionNodeName(constant_node.getValue(), constant_node.getResultType());
break;
}
case QueryTreeNodeType::FUNCTION:
{
const auto & function_node = node->as<FunctionNode &>();
String in_function_second_argument_node_name;
if (isNameOfInFunction(function_node.getFunctionName()))
{
const auto & in_second_argument_node = function_node.getArguments().getNodes().at(1);
in_function_second_argument_node_name = planner_context.createSetKey(in_second_argument_node);
}
WriteBufferFromOwnString buffer;
buffer << function_node.getFunctionName();
const auto & function_parameters_nodes = function_node.getParameters().getNodes();
if (!function_parameters_nodes.empty())
{
buffer << '(';
size_t function_parameters_nodes_size = function_parameters_nodes.size();
for (size_t i = 0; i < function_parameters_nodes_size; ++i)
{
const auto & function_parameter_node = function_parameters_nodes[i];
buffer << calculateActionNodeName(function_parameter_node, planner_context, node_to_name);
if (i + 1 != function_parameters_nodes_size)
buffer << ", ";
}
buffer << ')';
}
const auto & function_arguments_nodes = function_node.getArguments().getNodes();
String function_argument_name;
buffer << '(';
size_t function_arguments_nodes_size = function_arguments_nodes.size();
for (size_t i = 0; i < function_arguments_nodes_size; ++i)
{
if (i == 1 && !in_function_second_argument_node_name.empty())
{
function_argument_name = in_function_second_argument_node_name;
}
else
{
const auto & function_argument_node = function_arguments_nodes[i];
function_argument_name = calculateActionNodeName(function_argument_node, planner_context, node_to_name);
}
buffer << function_argument_name;
if (i + 1 != function_arguments_nodes_size)
buffer << ", ";
}
buffer << ')';
if (function_node.isWindowFunction())
{
buffer << " OVER (";
buffer << calculateWindowNodeActionName(function_node.getWindowNode(), planner_context, node_to_name);
buffer << ')';
}
result = buffer.str();
break;
}
case QueryTreeNodeType::LAMBDA:
{
auto lambda_hash = node->getTreeHash();
result = "__lambda_" + toString(lambda_hash.first) + '_' + toString(lambda_hash.second);
break;
}
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage());
}
}
node_to_name.emplace(node, result);
return result;
ActionNodeNameHelper helper(node_to_name, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateActionNodeName(node);
}
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context)
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, bool use_column_identifier_as_action_node_name)
{
QueryTreeNodeToName empty_map;
return calculateActionNodeName(node, planner_context, empty_map);
ActionNodeNameHelper helper(empty_map, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateActionNodeName(node);
}
String calculateConstantActionNodeName(const Field & constant_literal, const DataTypePtr & constant_type)
{
auto constant_name = applyVisitor(FieldVisitorToString(), constant_literal);
return constant_name + "_" + constant_type->getName();
return ActionNodeNameHelper::calculateConstantActionNodeName(constant_literal, constant_type);
}
String calculateConstantActionNodeName(const Field & constant_literal)
{
return calculateConstantActionNodeName(constant_literal, applyVisitor(FieldToDataType(), constant_literal));
return ActionNodeNameHelper::calculateConstantActionNodeName(constant_literal);
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name)
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name)
{
auto & window_node = node->as<WindowNode &>();
WriteBufferFromOwnString buffer;
if (window_node.hasPartitionBy())
{
buffer << "PARTITION BY ";
auto & partition_by_nodes = window_node.getPartitionBy().getNodes();
size_t partition_by_nodes_size = partition_by_nodes.size();
for (size_t i = 0; i < partition_by_nodes_size; ++i)
{
auto & partition_by_node = partition_by_nodes[i];
buffer << calculateActionNodeName(partition_by_node, planner_context, node_to_name);
if (i + 1 != partition_by_nodes_size)
buffer << ", ";
}
}
if (window_node.hasOrderBy())
{
if (window_node.hasPartitionBy())
buffer << ' ';
buffer << "ORDER BY ";
auto & order_by_nodes = window_node.getOrderBy().getNodes();
size_t order_by_nodes_size = order_by_nodes.size();
for (size_t i = 0; i < order_by_nodes_size; ++i)
{
auto & sort_node = order_by_nodes[i]->as<SortNode &>();
buffer << calculateActionNodeName(sort_node.getExpression(), planner_context, node_to_name);
auto sort_direction = sort_node.getSortDirection();
buffer << (sort_direction == SortDirection::ASCENDING ? " ASC" : " DESC");
auto nulls_sort_direction = sort_node.getNullsSortDirection();
if (nulls_sort_direction)
buffer << " NULLS " << (nulls_sort_direction == sort_direction ? "LAST" : "FIRST");
if (auto collator = sort_node.getCollator())
buffer << " COLLATE " << collator->getLocale();
if (sort_node.withFill())
{
buffer << " WITH FILL";
if (sort_node.hasFillFrom())
buffer << " FROM " << calculateActionNodeName(sort_node.getFillFrom(), planner_context, node_to_name);
if (sort_node.hasFillTo())
buffer << " TO " << calculateActionNodeName(sort_node.getFillTo(), planner_context, node_to_name);
if (sort_node.hasFillStep())
buffer << " STEP " << calculateActionNodeName(sort_node.getFillStep(), planner_context, node_to_name);
}
if (i + 1 != order_by_nodes_size)
buffer << ", ";
}
}
auto & window_frame = window_node.getWindowFrame();
if (!window_frame.is_default)
{
if (window_node.hasPartitionBy() || window_node.hasOrderBy())
buffer << ' ';
buffer << window_frame.type << " BETWEEN ";
if (window_frame.begin_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameBeginOffsetNode(), planner_context, node_to_name);
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
buffer << " AND ";
if (window_frame.end_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameEndOffsetNode(), planner_context, node_to_name);
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
}
return buffer.str();
ActionNodeNameHelper helper(node_to_name, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateWindowNodeActionName(node);
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context)
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, bool use_column_identifier_as_action_node_name)
{
QueryTreeNodeToName empty_map;
return calculateWindowNodeActionName(node, planner_context, empty_map);
ActionNodeNameHelper helper(empty_map, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateWindowNodeActionName(node);
}
}

View File

@ -23,7 +23,7 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
* Preconditions:
* 1. Table expression data for table expression nodes is collected in planner context.
* For column node, that has column table expression source, identifier for column name in table expression data
* is used as action dag node name.
* is used as action dag node name, if use_column_identifier_as_action_node_name = true.
* 2. Sets for IN functions are already collected in planner context.
*
* During actions build, there is special handling for following functions:
@ -33,7 +33,7 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class PlannerActionsVisitor
{
public:
explicit PlannerActionsVisitor(const PlannerContextPtr & planner_context_);
explicit PlannerActionsVisitor(const PlannerContextPtr & planner_context_, bool use_column_identifier_as_action_node_name_ = true);
/** Add actions necessary to calculate expression node into expression dag.
* Necessary actions are not added in actions dag output.
@ -43,21 +43,27 @@ public:
private:
const PlannerContextPtr planner_context;
bool use_column_identifier_as_action_node_name = true;
};
/** Calculate query tree expression node action dag name and add them into node to name map.
* If node exists in map, name from map is used.
*
* For column node column node identifier from planner context is used.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
using QueryTreeNodeToName = std::unordered_map<QueryTreeNodePtr, String>;
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name);
String calculateActionNodeName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name = true);
/** Calculate query tree expression node action dag name.
*
* For column node column node identifier from planner context is used.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context);
String calculateActionNodeName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
bool use_column_identifier_as_action_node_name = true);
/// Calculate action node name for constant
String calculateConstantActionNodeName(const Field & constant_literal, const DataTypePtr & constant_type);
@ -67,12 +73,19 @@ String calculateConstantActionNodeName(const Field & constant_literal);
/** Calculate action node name for window node.
* Window node action name can only be part of window function action name.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name);
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name = true);
/** Calculate action node name for window node.
* Window node action name can only be part of window function action name.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context);
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
bool use_column_identifier_as_action_node_name = true);
}

View File

@ -1,11 +1,17 @@
#include <Planner/PlannerJoinTree.h>
#include <Common/scope_guard_safe.h>
#include <Columns/ColumnAggregateFunction.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Functions/FunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <Access/Common/AccessFlags.h>
#include <Access/ContextAccess.h>
@ -14,6 +20,7 @@
#include <Analyzer/ConstantNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/QueryNode.h>
@ -22,13 +29,15 @@
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/Utils.h>
#include <Analyzer/AggregationUtils.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -44,10 +53,9 @@
#include <Planner/PlannerJoins.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/Utils.h>
#include <Planner/CollectSets.h>
#include <Planner/CollectTableExpressionData.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/scope_guard_safe.h>
namespace DB
{
@ -62,6 +70,7 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
extern const int TOO_MANY_COLUMNS;
extern const int UNSUPPORTED_METHOD;
extern const int BAD_ARGUMENTS;
}
namespace
@ -163,19 +172,39 @@ bool applyTrivialCountIfPossible(
if (!settings.optimize_trivial_count_query)
return false;
/// can't apply if FINAL
if (table_node.getTableExpressionModifiers().has_value() && table_node.getTableExpressionModifiers()->hasFinal())
return false;
auto & main_query_node = query_tree->as<QueryNode &>();
if (main_query_node.hasGroupBy())
return false;
const auto & storage = table_node.getStorage();
if (!storage || storage->hasLightweightDeletedMask())
auto storage_id = storage->getStorageID();
auto row_policy_filter = query_context->getRowPolicyFilter(storage_id.getDatabaseName(),
storage_id.getTableName(),
RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
return {};
/** Transaction check here is necessary because
* MergeTree maintains total count for all parts in Active state and it simply returns that number for trivial select count() from table query.
* But if we have current transaction, then we should return number of rows in current snapshot (that may include parts in Outdated state),
* so we have to use totalRowsByPartitionPredicate() instead of totalRows even for trivial query
* See https://github.com/ClickHouse/ClickHouse/pull/24258/files#r828182031
*/
if (query_context->getCurrentTransaction())
return false;
if (settings.max_parallel_replicas > 1 || settings.allow_experimental_query_deduplication
/// can't apply if FINAL
if (table_node.getTableExpressionModifiers().has_value() &&
(table_node.getTableExpressionModifiers()->hasFinal() || table_node.getTableExpressionModifiers()->hasSampleSizeRatio() ||
table_node.getTableExpressionModifiers()->hasSampleOffsetRatio()))
return false;
// TODO: It's possible to optimize count() given only partition predicates
auto & main_query_node = query_tree->as<QueryNode &>();
if (main_query_node.hasGroupBy() || main_query_node.hasPrewhere() || main_query_node.hasWhere())
return false;
if (storage->hasLightweightDeletedMask())
return false;
if (settings.max_parallel_replicas > 1 ||
settings.allow_experimental_query_deduplication
|| settings.empty_result_for_aggregation_by_empty_set)
return false;
@ -189,31 +218,12 @@ bool applyTrivialCountIfPossible(
if (!count_func)
return false;
/// get number of rows
std::optional<UInt64> num_rows{};
/// Transaction check here is necessary because
/// MergeTree maintains total count for all parts in Active state and it simply returns that number for trivial select count() from table query.
/// But if we have current transaction, then we should return number of rows in current snapshot (that may include parts in Outdated state),
/// so we have to use totalRowsByPartitionPredicate() instead of totalRows even for trivial query
/// See https://github.com/ClickHouse/ClickHouse/pull/24258/files#r828182031
if (!main_query_node.hasPrewhere() && !main_query_node.hasWhere() && !query_context->getCurrentTransaction())
{
num_rows = storage->totalRows(settings);
}
// TODO:
// else // It's possible to optimize count() given only partition predicates
// {
// SelectQueryInfo temp_query_info;
// temp_query_info.query = query_ptr;
// temp_query_info.syntax_analyzer_result = syntax_analyzer_result;
// temp_query_info.prepared_sets = query_analyzer->getPreparedSets();
// num_rows = storage->totalRowsByPartitionPredicate(temp_query_info, context);
// }
/// Get number of rows
std::optional<UInt64> num_rows = storage->totalRows(settings);
if (!num_rows)
return false;
/// set aggregation state
/// Set aggregation state
const AggregateFunctionCount & agg_count = *count_func;
std::vector<char> state(agg_count.sizeOfData());
AggregateDataPtr place = state.data();
@ -307,6 +317,115 @@ void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expr
settings.max_columns_to_read);
}
void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot)
{
if (!table_expression_query_info.prewhere_info)
return;
auto & prewhere_actions = table_expression_query_info.prewhere_info->prewhere_actions;
NameSet required_columns;
if (column_names.size() == 1)
required_columns.insert(column_names[0]);
auto & table_expression_modifiers = table_expression_query_info.table_expression_modifiers;
if (table_expression_modifiers)
{
if (table_expression_modifiers->hasSampleSizeRatio() ||
table_expression_query_info.planner_context->getQueryContext()->getSettingsRef().parallel_replicas_count > 1)
{
/// We evaluate sampling for Merge lazily so we need to get all the columns
if (storage_snapshot->storage.getName() == "Merge")
{
const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll();
for (const auto & column : columns)
required_columns.insert(column.name);
}
else
{
auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling();
required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end());
}
}
if (table_expression_modifiers->hasFinal())
{
auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal();
required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end());
}
}
std::unordered_set<const ActionsDAG::Node *> required_output_nodes;
for (const auto * input : prewhere_actions->getInputs())
{
if (required_columns.contains(input->result_name))
required_output_nodes.insert(input);
}
if (required_output_nodes.empty())
return;
auto & prewhere_outputs = prewhere_actions->getOutputs();
for (const auto & output : prewhere_outputs)
{
auto required_output_node_it = required_output_nodes.find(output);
if (required_output_node_it == required_output_nodes.end())
continue;
required_output_nodes.erase(required_output_node_it);
}
prewhere_outputs.insert(prewhere_outputs.end(), required_output_nodes.begin(), required_output_nodes.end());
}
FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
auto storage_id = storage->getStorageID();
const auto & query_context = planner_context->getQueryContext();
auto row_policy_filter = query_context->getRowPolicyFilter(storage_id.getDatabaseName(), storage_id.getTableName(), RowPolicyFilterType::SELECT_FILTER);
if (!row_policy_filter)
return {};
auto row_policy_filter_query_tree = buildQueryTree(row_policy_filter->expression, query_context);
QueryAnalysisPass query_analysis_pass(table_expression_query_info.table_expression);
query_analysis_pass.run(row_policy_filter_query_tree, query_context);
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression_query_info.table_expression);
const auto table_expression_names = table_expression_data.getColumnNames();
NameSet table_expression_required_names_without_row_policy(table_expression_names.begin(), table_expression_names.end());
collectSourceColumns(row_policy_filter_query_tree, planner_context);
collectSets(row_policy_filter_query_tree, *planner_context);
auto row_policy_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(row_policy_actions_dag, row_policy_filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Row policy filter actions must return single output node. Actual {}",
expression_nodes.size());
auto & row_policy_actions_outputs = row_policy_actions_dag->getOutputs();
row_policy_actions_outputs = std::move(expression_nodes);
std::string filter_node_name = row_policy_actions_outputs[0]->result_name;
bool remove_filter_column = true;
for (const auto & row_policy_input_node : row_policy_actions_dag->getInputs())
if (table_expression_required_names_without_row_policy.contains(row_policy_input_node->result_name))
row_policy_actions_outputs.push_back(row_policy_input_node);
return {std::move(row_policy_actions_dag), std::move(filter_node_name), remove_filter_column};
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
const SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
@ -428,9 +547,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
else
table_expression_query_info.table_expression_modifiers = table_function_node->getTableExpressionModifiers();
auto columns_names = table_expression_data.getColumnNames();
bool need_rewrite_query_with_final = storage->needRewriteQueryWithFinal(columns_names);
bool need_rewrite_query_with_final = storage->needRewriteQueryWithFinal(table_expression_data.getColumnNames());
if (need_rewrite_query_with_final)
{
if (table_expression_query_info.table_expression_modifiers)
@ -452,8 +569,11 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
/// Apply trivial_count optimization if possible
bool is_trivial_count_applied = !select_query_options.only_analyze && is_single_table_expression && table_node && select_query_info.has_aggregates
&& applyTrivialCountIfPossible(query_plan, *table_node, select_query_info.query_tree, planner_context->getQueryContext(), columns_names);
bool is_trivial_count_applied = !select_query_options.only_analyze &&
is_single_table_expression &&
table_node &&
select_query_info.has_aggregates &&
applyTrivialCountIfPossible(query_plan, *table_node, select_query_info.query_tree, planner_context->getQueryContext(), table_expression_data.getColumnNames());
if (is_trivial_count_applied)
{
@ -463,9 +583,67 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
{
if (!select_query_options.only_analyze)
{
const auto & prewhere_actions = table_expression_data.getPrewhereFilterActions();
if (prewhere_actions)
{
table_expression_query_info.prewhere_info = std::make_shared<PrewhereInfo>();
table_expression_query_info.prewhere_info->prewhere_actions = prewhere_actions;
table_expression_query_info.prewhere_info->prewhere_column_name = prewhere_actions->getOutputs().at(0)->result_name;
table_expression_query_info.prewhere_info->remove_prewhere_column = true;
table_expression_query_info.prewhere_info->need_filter = true;
}
updatePrewhereOutputsIfNeeded(table_expression_query_info, table_expression_data.getColumnNames(), storage_snapshot);
auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context);
bool moved_row_policy_to_prewhere = false;
if (row_policy_filter_info.actions)
{
bool is_final = table_expression_query_info.table_expression_modifiers &&
table_expression_query_info.table_expression_modifiers->hasFinal();
bool optimize_move_to_prewhere = settings.optimize_move_to_prewhere && (!is_final || settings.optimize_move_to_prewhere_if_final);
if (storage->supportsPrewhere() && optimize_move_to_prewhere)
{
if (!table_expression_query_info.prewhere_info)
table_expression_query_info.prewhere_info = std::make_shared<PrewhereInfo>();
if (!table_expression_query_info.prewhere_info->prewhere_actions)
{
table_expression_query_info.prewhere_info->prewhere_actions = row_policy_filter_info.actions;
table_expression_query_info.prewhere_info->prewhere_column_name = row_policy_filter_info.column_name;
table_expression_query_info.prewhere_info->remove_prewhere_column = row_policy_filter_info.do_remove_column;
}
else
{
table_expression_query_info.prewhere_info->row_level_filter = row_policy_filter_info.actions;
table_expression_query_info.prewhere_info->row_level_column_name = row_policy_filter_info.column_name;
}
table_expression_query_info.prewhere_info->need_filter = true;
moved_row_policy_to_prewhere = true;
}
}
const auto & columns_names = table_expression_data.getColumnNames();
from_stage = storage->getQueryProcessingStage(query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);
storage->read(query_plan, columns_names, storage_snapshot, table_expression_query_info, query_context, from_stage, max_block_size, max_streams);
if (query_plan.isInitialized() &&
from_stage == QueryProcessingStage::FetchColumns &&
row_policy_filter_info.actions &&
!moved_row_policy_to_prewhere)
{
auto row_level_filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
row_policy_filter_info.actions,
row_policy_filter_info.column_name,
row_policy_filter_info.do_remove_column);
row_level_filter_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_filter_step));
}
if (query_context->hasQueryContext() && !select_query_options.is_internal)
{
auto local_storage_id = storage->getStorageID();
@ -493,7 +671,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
else
{
/// Create step which reads from empty source if storage has no data.
auto source_header = storage_snapshot->getSampleBlockForColumns(columns_names);
auto source_header = storage_snapshot->getSampleBlockForColumns(table_expression_data.getColumnNames());
Pipe pipe(std::make_shared<NullSource>(source_header));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource");

View File

@ -3,6 +3,8 @@
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/ActionsDAG.h>
namespace DB
{
@ -238,6 +240,26 @@ public:
is_remote = is_remote_value;
}
const ActionsDAGPtr & getPrewhereFilterActions() const
{
return prewhere_filter_actions;
}
void setPrewhereFilterActions(ActionsDAGPtr prewhere_filter_actions_value)
{
prewhere_filter_actions = std::move(prewhere_filter_actions_value);
}
const ActionsDAGPtr & getFilterActions() const
{
return filter_actions;
}
void setFilterActions(ActionsDAGPtr filter_actions_value)
{
filter_actions = std::move(filter_actions_value);
}
private:
void addColumnImpl(const NameAndTypePair & column, const ColumnIdentifier & column_identifier)
{
@ -262,6 +284,12 @@ private:
/// Valid for table, table function, array join, query, union nodes
ColumnIdentifierToColumnName column_identifier_to_column_name;
/// Valid for table, table function
ActionsDAGPtr filter_actions;
/// Valid for table, table function
ActionsDAGPtr prewhere_filter_actions;
/// Is storage remote
bool is_remote = false;
};

View File

@ -192,7 +192,9 @@ StorageLimits buildStorageLimits(const Context & context, const SelectQueryOptio
return {limits, leaf_limits};
}
ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node, const ColumnsWithTypeAndName & input_columns, const PlannerContextPtr & planner_context)
ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context)
{
ActionsDAGPtr action_dag = std::make_shared<ActionsDAG>(input_columns);
PlannerActionsVisitor actions_visitor(planner_context);

View File

@ -105,6 +105,7 @@ using Stack = std::vector<Frame>;
/// Second pass optimizations
void optimizePrimaryKeyCondition(const Stack & stack);
void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);

View File

@ -0,0 +1,371 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Interpreters/ActionsDAG.h>
#include <Planner/ActionsChain.h>
#include <deque>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
void matchDAGOutputNodesOrderWithHeader(ActionsDAGPtr & actions_dag, const Block & expected_header)
{
std::unordered_map<std::string, const ActionsDAG::Node *> output_name_to_node;
for (const auto * output_node : actions_dag->getOutputs())
output_name_to_node.emplace(output_node->result_name, output_node);
std::unordered_set<const ActionsDAG::Node *> used_output_nodes;
ActionsDAG::NodeRawConstPtrs updated_outputs;
updated_outputs.reserve(expected_header.columns());
for (const auto & column : expected_header)
{
auto output_node_it = output_name_to_node.find(column.name);
if (output_node_it == output_name_to_node.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid move to PREWHERE optimization. Cannot find column {} in output",
column.name);
updated_outputs.push_back(output_node_it->second);
used_output_nodes.insert(output_node_it->second);
}
ActionsDAG::NodeRawConstPtrs unused_outputs;
for (const auto * output_node : actions_dag->getOutputs())
{
if (used_output_nodes.contains(output_node))
continue;
unused_outputs.push_back(output_node);
}
auto & actions_dag_outputs = actions_dag->getOutputs();
actions_dag_outputs = std::move(updated_outputs);
actions_dag_outputs.insert(actions_dag_outputs.end(), unused_outputs.begin(), unused_outputs.end());
}
}
namespace QueryPlanOptimizations
{
void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes)
{
if (stack.size() < 3)
return;
const auto & frame = stack.back();
/** Assume that on stack there are at least 3 nodes:
*
* 1. SomeNode
* 2. FilterNode
* 3. ReadFromMergeTreeNode
*/
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(frame.node->step.get());
if (!read_from_merge_tree)
return;
const auto & storage_prewhere_info = read_from_merge_tree->getPrewhereInfo();
if (storage_prewhere_info && storage_prewhere_info->prewhere_actions)
return;
const QueryPlan::Node * filter_node = (stack.rbegin() + 1)->node;
const auto * filter_step = typeid_cast<FilterStep *>(filter_node->step.get());
if (!filter_step)
return;
/** Collect required filter output columns.
* Collect output nodes that are mapped to input nodes.
* Collect input node to output nodes mapping.
*/
ColumnsWithTypeAndName required_columns_after_filter;
std::unordered_set<std::string> output_nodes_mapped_to_input;
std::unordered_map<std::string, std::vector<std::string>> input_node_to_output_names;
for (const auto * output_node : filter_step->getExpression()->getOutputs())
{
const auto * node_without_alias = output_node;
while (node_without_alias->type == ActionsDAG::ActionType::ALIAS)
node_without_alias = node_without_alias->children[0];
if (node_without_alias->type == ActionsDAG::ActionType::INPUT)
{
output_nodes_mapped_to_input.emplace(output_node->result_name);
auto output_names_it = input_node_to_output_names.find(node_without_alias->result_name);
if (output_names_it == input_node_to_output_names.end())
{
auto [insert_it, _] = input_node_to_output_names.emplace(node_without_alias->result_name, std::vector<std::string>());
output_names_it = insert_it;
}
output_names_it->second.push_back(output_node->result_name);
}
if (output_node->result_name == filter_step->getFilterColumnName() && filter_step->removesFilterColumn())
continue;
required_columns_after_filter.push_back(ColumnWithTypeAndName(output_node->result_type, output_node->result_name));
}
const auto & context = read_from_merge_tree->getContext();
const auto & settings = context->getSettingsRef();
if (!settings.allow_experimental_analyzer)
return;
const auto & table_expression_modifiers = read_from_merge_tree->getQueryInfo().table_expression_modifiers;
bool is_final = table_expression_modifiers && table_expression_modifiers->hasFinal();
bool optimize_move_to_prewhere = settings.optimize_move_to_prewhere && (!is_final || settings.optimize_move_to_prewhere_if_final);
if (!optimize_move_to_prewhere)
return;
const auto & storage = read_from_merge_tree->getStorageSnapshot()->storage;
const auto & storage_metadata = read_from_merge_tree->getStorageSnapshot()->metadata;
auto column_sizes = storage.getColumnSizes();
if (column_sizes.empty())
return;
/// Extract column compressed sizes
std::unordered_map<std::string, UInt64> column_compressed_sizes;
for (const auto & [name, sizes] : column_sizes)
column_compressed_sizes[name] = sizes.data_compressed;
Names queried_columns = read_from_merge_tree->getRealColumnNames();
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
storage_metadata,
queried_columns,
storage.supportedPrewhereColumns(),
&Poco::Logger::get("QueryPlanOptimizePrewhere")};
auto optimize_result = where_optimizer.optimize(filter_step->getExpression(),
filter_step->getFilterColumnName(),
read_from_merge_tree->getContext(),
is_final);
if (!optimize_result.has_value())
return;
PrewhereInfoPtr prewhere_info;
if (storage_prewhere_info)
prewhere_info = storage_prewhere_info->clone();
else
prewhere_info = std::make_shared<PrewhereInfo>();
prewhere_info->need_filter = true;
auto & prewhere_filter_actions = optimize_result->prewhere_filter_actions;
ActionsChain actions_chain;
std::string prewere_filter_node_name = prewhere_filter_actions->getOutputs().at(0)->result_name;
actions_chain.addStep(std::make_unique<ActionsChainStep>(prewhere_filter_actions));
auto & filter_actions = optimize_result->filter_actions;
/** Merge tree where optimizer splits conjunctions in filter expression into 2 parts:
* 1. Filter expressions.
* 2. Prewhere filter expressions.
*
* There can be cases when all expressions are moved to PREWHERE, but it is not
* enough to produce required filter output columns.
*
* Example: SELECT (a AND b) AS cond FROM test_table WHERE cond AND c;
* In this example condition expressions `a`, `b`, `c` can move to PREWHERE, but PREWHERE will not contain expression `and(a, b)`.
* It will contain only `a`, `b`, `c`, `and(a, b, c)` expressions.
*
* In such scenario we need to create additional step to calculate `and(a, b)` expression after PREWHERE.
*/
bool need_additional_filter_after_prewhere = false;
if (!filter_actions)
{
/// Any node from PREWHERE filter actions can be used as possible output node
std::unordered_set<std::string> possible_prewhere_output_nodes;
for (const auto & node : prewhere_filter_actions->getNodes())
possible_prewhere_output_nodes.insert(node.result_name);
for (auto & required_column : required_columns_after_filter)
{
if (!possible_prewhere_output_nodes.contains(required_column.name) &&
!output_nodes_mapped_to_input.contains(required_column.name))
{
need_additional_filter_after_prewhere = true;
break;
}
}
}
/** If there are additional filter actions after PREWHERE filter actions, we create filter actions dag using PREWHERE filter
* actions output columns as filter actions dag input columns.
* Then we merge this filter actions dag nodes with old filter step actions dag nodes, to reuse some expressions from
* PREWHERE filter actions.
*/
if (need_additional_filter_after_prewhere || filter_actions)
{
auto merged_filter_actions = std::make_shared<ActionsDAG>(actions_chain.getLastStepAvailableOutputColumns());
merged_filter_actions->getOutputs().clear();
merged_filter_actions->mergeNodes(std::move(*filter_step->getExpression()->clone()));
/// Add old filter step filter column to outputs
for (const auto & node : merged_filter_actions->getNodes())
{
if (node.result_name == filter_step->getFilterColumnName())
{
merged_filter_actions->getOutputs().push_back(&node);
break;
}
}
filter_actions = std::move(merged_filter_actions);
/// If there is filter after PREWHERE, we can ignore filtering during PREWHERE stage
prewhere_info->need_filter = false;
actions_chain.addStep(std::make_unique<ActionsChainStep>(filter_actions));
}
auto required_output_actions = std::make_shared<ActionsDAG>(required_columns_after_filter);
actions_chain.addStep(std::make_unique<ActionsChainStep>(required_output_actions));
actions_chain.finalize();
prewhere_filter_actions->projectInput(false);
auto & prewhere_actions_chain_node = actions_chain[0];
prewhere_info->prewhere_actions = std::move(prewhere_filter_actions);
prewhere_info->prewhere_column_name = prewere_filter_node_name;
prewhere_info->remove_prewhere_column = !prewhere_actions_chain_node->getChildRequiredOutputColumnsNames().contains(prewere_filter_node_name);
read_from_merge_tree->updatePrewhereInfo(prewhere_info);
QueryPlan::Node * replace_old_filter_node = nullptr;
bool remove_filter_node = false;
if (filter_actions)
{
filter_actions->projectInput(false);
/// Match dag output nodes with old filter step header
matchDAGOutputNodesOrderWithHeader(filter_actions, filter_step->getOutputStream().header);
auto & filter_actions_chain_node = actions_chain[1];
bool remove_filter_column = !filter_actions_chain_node->getChildRequiredOutputColumnsNames().contains(filter_step->getFilterColumnName());
auto after_prewhere_filter_step = std::make_unique<FilterStep>(read_from_merge_tree->getOutputStream(),
filter_actions,
filter_step->getFilterColumnName(),
remove_filter_column);
auto & node = nodes.emplace_back();
node.children.emplace_back(frame.node);
node.step = std::move(after_prewhere_filter_step);
replace_old_filter_node = &node;
}
else
{
auto rename_actions_dag = std::make_shared<ActionsDAG>(read_from_merge_tree->getOutputStream().header.getColumnsWithTypeAndName());
bool apply_rename_step = false;
ActionsDAG::NodeRawConstPtrs updated_outputs;
/** If in output after read from merge tree there are column names without aliases,
* apply old filter step aliases to them.
*/
for (const auto * output_node : rename_actions_dag->getOutputs())
{
const auto alias_it = input_node_to_output_names.find(output_node->result_name);
if (alias_it == input_node_to_output_names.end())
{
updated_outputs.push_back(output_node);
continue;
}
for (auto & output_name : alias_it->second)
{
if (output_name == output_node->result_name)
{
updated_outputs.push_back(output_node);
continue;
}
updated_outputs.push_back(&rename_actions_dag->addAlias(*output_node, output_name));
apply_rename_step = true;
}
}
rename_actions_dag->getOutputs() = std::move(updated_outputs);
bool apply_match_step = false;
/// If column order does not match old filter step column order, match dag output nodes with header
if (!blocksHaveEqualStructure(read_from_merge_tree->getOutputStream().header, filter_step->getOutputStream().header))
{
apply_match_step = true;
matchDAGOutputNodesOrderWithHeader(rename_actions_dag, filter_step->getOutputStream().header);
}
if (apply_rename_step || apply_match_step)
{
auto rename_step = std::make_unique<ExpressionStep>(read_from_merge_tree->getOutputStream(), rename_actions_dag);
if (apply_rename_step)
rename_step->setStepDescription("Change column names to column identifiers");
auto & node = nodes.emplace_back();
node.children.emplace_back(frame.node);
node.step = std::move(rename_step);
replace_old_filter_node = &node;
}
else
{
replace_old_filter_node = frame.node;
remove_filter_node = true;
}
}
QueryPlan::Node * filter_parent_node = (stack.rbegin() + 2)->node;
for (auto & filter_parent_child : filter_parent_node->children)
{
if (filter_parent_child == filter_node)
{
filter_parent_child = replace_old_filter_node;
size_t stack_size = stack.size();
/** If filter step is completely replaced with PREWHERE filter actions, remove it from stack.
* Otherwise replace old filter step with new filter step after PREWHERE.
*/
if (remove_filter_node)
{
std::swap(stack[stack_size - 1], stack[stack_size - 2]);
stack.pop_back();
}
else
{
stack[stack_size - 2] = Frame{.node = replace_old_filter_node, .next_child = 1};
}
break;
}
}
}
}
}

View File

@ -183,7 +183,7 @@ void buildSortingDAG(QueryPlan::Node & node, ActionsDAGPtr & dag, FixedColumns &
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
if (const auto * prewhere_info = reading->getPrewhereInfo())
if (const auto prewhere_info = reading->getPrewhereInfo())
{
/// Should ignore limit if there is filtering.
limit = 0;

View File

@ -131,6 +131,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
continue;
}
optimizePrewhere(stack, nodes);
optimizePrimaryKeyCondition(stack);
enableMemoryBoundMerging(*frame.node, nodes);

View File

@ -1397,6 +1397,17 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
return true;
}
void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)
{
query_info.prewhere_info = prewhere_info_value;
prewhere_info = prewhere_info_value;
output_stream = DataStream{.header = IMergeTreeSelectAlgorithm::transformHeader(
storage_snapshot->getSampleBlockForColumns(real_column_names),
prewhere_info_value,
data.getPartitionValueType(),
virt_column_names)};
}
bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
{
if (isQueryWithFinal())

View File

@ -121,7 +121,11 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeIndexes(JSONBuilder::JSONMap & map) const override;
const Names & getRealColumnNames() const { return real_column_names; }
const Names & getVirtualColumnNames() const { return virt_column_names; }
StorageID getStorageID() const { return data.getStorageID(); }
const StorageSnapshotPtr & getStorageSnapshot() const { return storage_snapshot; }
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
UInt64 getSelectedMarks() const { return selected_marks; }
@ -144,11 +148,13 @@ public:
ContextPtr getContext() const { return context; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
const PrewhereInfo * getPrewhereInfo() const { return prewhere_info.get(); }
const PrewhereInfoPtr & getPrewhereInfo() const { return prewhere_info; }
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
void updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value);
static bool isFinal(const SelectQueryInfo & query_info);
/// Returns true if the optimisation is applicable (and applies it then).

View File

@ -78,7 +78,6 @@ SelectQueryDescription buildSelectQueryDescription(const ASTPtr & select_query,
{
ASTPtr inner_query = select_query;
std::optional<StorageID> dependent_table_storage_id;
bool allow_experimental_analyzer = context->getSettingsRef().allow_experimental_analyzer;
while (true)
{
@ -100,10 +99,6 @@ SelectQueryDescription buildSelectQueryDescription(const ASTPtr & select_query,
if (auto db_and_table = getDatabaseAndTable(*inner_select_query, 0))
{
const auto * table_expression = getTableExpression(*inner_select_query, 0);
if (allow_experimental_analyzer && table_expression->database_and_table_name->tryGetAlias().empty())
table_expression->database_and_table_name->setAlias("__dependent_table");
String select_database_name = db_and_table->database;
String select_table_name = db_and_table->table;

View File

@ -635,6 +635,7 @@ Block IMergeTreeSelectAlgorithm::applyPrewhereActions(Block block, const Prewher
}
if (prewhere_info->prewhere_actions)
{
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
@ -645,18 +646,22 @@ Block IMergeTreeSelectAlgorithm::applyPrewhereActions(Block block, const Prewher
}
if (prewhere_info->remove_prewhere_column)
{
block.erase(prewhere_info->prewhere_column_name);
else
}
else if (prewhere_info->need_filter)
{
WhichDataType which(removeNullable(recursiveRemoveLowCardinality(prewhere_column.type)));
if (which.isNativeInt() || which.isNativeUInt())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
else if (which.isFloat())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1.0f)->convertToFullColumnIfConst();
else
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column for filter", prewhere_column.type->getName());
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column for filter",
prewhere_column.type->getName());
}
}
}

View File

@ -93,7 +93,13 @@ const ActionsDAG::Node & addClonedDAGToDAG(const ActionsDAG::Node * original_dag
return new_node;
}
/// TODO: Do we need to handle ALIAS nodes in cloning?
if (original_dag_node->type == ActionsDAG::ActionType::ALIAS)
{
const auto & alias_child = addClonedDAGToDAG(original_dag_node->children[0], new_dag, node_remap);
const auto & new_node = new_dag->addAlias(alias_child, original_dag_node->result_name);
node_remap[node_name] = {new_dag, &new_node};
return new_node;
}
/// If the node is a function, add it as a function and add its children
if (original_dag_node->type == ActionsDAG::ActionType::FUNCTION)

View File

@ -12,23 +12,18 @@
#include <Interpreters/misc.h>
#include <Common/typeid_cast.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/ActionsDAG.h>
#include <base/map.h>
namespace DB
{
namespace ErrorCodes
{
}
/// Conditions like "x = N" are considered good if abs(N) > threshold.
/// This is used to assume that condition is likely to have good selectivity.
static constexpr auto threshold = 2;
MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
SelectQueryInfo & query_info,
ContextPtr context,
std::unordered_map<std::string, UInt64> column_sizes_,
const StorageMetadataPtr & metadata_snapshot,
const Names & queried_columns_,
@ -40,11 +35,8 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
, supported_columns{supported_columns_}
, sorting_key_names{NameSet(
metadata_snapshot->getSortingKey().column_names.begin(), metadata_snapshot->getSortingKey().column_names.end())}
, block_with_constants{KeyCondition::getBlockWithConstants(query_info.query->clone(), query_info.syntax_analyzer_result, context)}
, log{log_}
, column_sizes{std::move(column_sizes_)}
, move_all_conditions_to_prewhere(context->getSettingsRef().move_all_conditions_to_prewhere)
, log_queries_cut_to_length(context->getSettingsRef().log_queries_cut_to_length)
{
for (const auto & name : queried_columns)
{
@ -52,180 +44,214 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
if (it != column_sizes.end())
total_size_of_queried_columns += it->second;
}
determineArrayJoinedNames(query_info.query->as<ASTSelectQuery &>());
optimize(query_info.query->as<ASTSelectQuery &>());
}
static void collectIdentifiersNoSubqueries(const ASTPtr & ast, NameSet & set)
void MergeTreeWhereOptimizer::optimize(SelectQueryInfo & select_query_info, const ContextPtr & context) const
{
if (auto opt_name = tryGetIdentifierName(ast))
return (void)set.insert(*opt_name);
if (ast->as<ASTSubquery>())
auto & select = select_query_info.query->as<ASTSelectQuery &>();
if (!select.where() || select.prewhere())
return;
for (const auto & child : ast->children)
collectIdentifiersNoSubqueries(child, set);
auto block_with_constants = KeyCondition::getBlockWithConstants(select_query_info.query->clone(),
select_query_info.syntax_analyzer_result,
context);
WhereOptimizerContext where_optimizer_context;
where_optimizer_context.context = context;
where_optimizer_context.array_joined_names = determineArrayJoinedNames(select);
where_optimizer_context.move_all_conditions_to_prewhere = context->getSettingsRef().move_all_conditions_to_prewhere;
where_optimizer_context.is_final = select.final();
RPNBuilderTreeContext tree_context(context, std::move(block_with_constants), {} /*prepared_sets*/);
RPNBuilderTreeNode node(select.where().get(), tree_context);
auto optimize_result = optimizeImpl(node, where_optimizer_context);
if (!optimize_result)
return;
/// Rewrite the SELECT query.
auto where_filter_ast = reconstructAST(optimize_result->where_conditions);
auto prewhere_filter_ast = reconstructAST(optimize_result->prewhere_conditions);
select.setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_filter_ast));
select.setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_filter_ast));
UInt64 log_queries_cut_to_length = context->getSettingsRef().log_queries_cut_to_length;
LOG_DEBUG(log, "MergeTreeWhereOptimizer: condition \"{}\" moved to PREWHERE", select.prewhere()->formatForLogging(log_queries_cut_to_length));
}
static bool isConditionGood(const ASTPtr & condition)
std::optional<MergeTreeWhereOptimizer::FilterActionsOptimizeResult> MergeTreeWhereOptimizer::optimize(const ActionsDAGPtr & filter_dag,
const std::string & filter_column_name,
const ContextPtr & context,
bool is_final)
{
const auto * function = condition->as<ASTFunction>();
if (!function)
WhereOptimizerContext where_optimizer_context;
where_optimizer_context.context = context;
where_optimizer_context.array_joined_names = {};
where_optimizer_context.move_all_conditions_to_prewhere = context->getSettingsRef().move_all_conditions_to_prewhere;
where_optimizer_context.is_final = is_final;
RPNBuilderTreeContext tree_context(context);
RPNBuilderTreeNode node(&filter_dag->findInOutputs(filter_column_name), tree_context);
auto optimize_result = optimizeImpl(node, where_optimizer_context);
if (!optimize_result)
return {};
auto filter_actions = reconstructDAG(optimize_result->where_conditions, context);
auto prewhere_filter_actions = reconstructDAG(optimize_result->prewhere_conditions, context);
FilterActionsOptimizeResult result = { std::move(filter_actions), std::move(prewhere_filter_actions) };
return result;
}
static void collectColumns(const RPNBuilderTreeNode & node, const NameSet & columns_names, NameSet & result_set, bool & has_invalid_column)
{
if (node.isConstant())
return;
if (!node.isFunction())
{
auto column_name = node.getColumnName();
if (!columns_names.contains(column_name))
{
has_invalid_column = true;
return;
}
result_set.insert(column_name);
return;
}
auto function_node = node.toFunctionNode();
size_t arguments_size = function_node.getArgumentsSize();
for (size_t i = 0; i < arguments_size; ++i)
{
auto function_argument = function_node.getArgumentAt(i);
collectColumns(function_argument, columns_names, result_set, has_invalid_column);
}
}
static bool isConditionGood(const RPNBuilderTreeNode & condition, const NameSet & columns_names)
{
if (!condition.isFunction())
return false;
/** we are only considering conditions of form `equals(one, another)` or `one = another`,
* especially if either `one` or `another` is ASTIdentifier */
if (function->name != "equals")
auto function_node = condition.toFunctionNode();
/** We are only considering conditions of form `equals(one, another)` or `one = another`,
* especially if either `one` or `another` is ASTIdentifier
*/
if (function_node.getFunctionName() != "equals" || function_node.getArgumentsSize() != 2)
return false;
auto * left_arg = function->arguments->children.front().get();
auto * right_arg = function->arguments->children.back().get();
auto lhs_argument = function_node.getArgumentAt(0);
auto rhs_argument = function_node.getArgumentAt(1);
/// try to ensure left_arg points to ASTIdentifier
if (!left_arg->as<ASTIdentifier>() && right_arg->as<ASTIdentifier>())
std::swap(left_arg, right_arg);
auto lhs_argument_column_name = lhs_argument.getColumnName();
auto rhs_argument_column_name = rhs_argument.getColumnName();
if (left_arg->as<ASTIdentifier>())
{
/// condition may be "good" if only right_arg is a constant and its value is outside the threshold
if (const auto * literal = right_arg->as<ASTLiteral>())
{
const auto & field = literal->value;
const auto type = field.getType();
bool lhs_argument_is_column = columns_names.contains(lhs_argument_column_name);
bool rhs_argument_is_column = columns_names.contains(rhs_argument_column_name);
bool lhs_argument_is_constant = lhs_argument.isConstant();
bool rhs_argument_is_constant = rhs_argument.isConstant();
RPNBuilderTreeNode * constant_node = nullptr;
if (lhs_argument_is_column && rhs_argument_is_constant)
constant_node = &rhs_argument;
else if (lhs_argument_is_constant && rhs_argument_is_column)
constant_node = &lhs_argument;
else
return false;
Field output_value;
DataTypePtr output_type;
if (!constant_node->tryGetConstant(output_value, output_type))
return false;
const auto type = output_value.getType();
/// check the value with respect to threshold
if (type == Field::Types::UInt64)
{
const auto value = field.get<UInt64>();
const auto value = output_value.get<UInt64>();
return value > threshold;
}
else if (type == Field::Types::Int64)
{
const auto value = field.get<Int64>();
const auto value = output_value.get<Int64>();
return value < -threshold || threshold < value;
}
else if (type == Field::Types::Float64)
{
const auto value = field.get<Float64>();
const auto value = output_value.get<Float64>();
return value < threshold || threshold < value;
}
}
}
return false;
}
static const ASTFunction * getAsTuple(const ASTPtr & node)
void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const
{
if (const auto * func = node->as<ASTFunction>(); func && func->name == "tuple")
return func;
return {};
}
auto function_node_optional = node.toFunctionNodeOrNull();
static bool getAsTupleLiteral(const ASTPtr & node, Tuple & tuple)
{
if (const auto * value_tuple = node->as<ASTLiteral>())
return value_tuple && value_tuple->value.tryGet<Tuple>(tuple);
return false;
}
bool MergeTreeWhereOptimizer::tryAnalyzeTuple(Conditions & res, const ASTFunction * func, bool is_final) const
{
if (!func || func->name != "equals" || func->arguments->children.size() != 2)
return false;
Tuple tuple_lit;
const ASTFunction * tuple_other = nullptr;
if (getAsTupleLiteral(func->arguments->children[0], tuple_lit))
tuple_other = getAsTuple(func->arguments->children[1]);
else if (getAsTupleLiteral(func->arguments->children[1], tuple_lit))
tuple_other = getAsTuple(func->arguments->children[0]);
if (!tuple_other || tuple_lit.size() != tuple_other->arguments->children.size())
return false;
for (size_t i = 0; i < tuple_lit.size(); ++i)
if (function_node_optional.has_value() && function_node_optional->getFunctionName() == "and")
{
const auto & child = tuple_other->arguments->children[i];
std::shared_ptr<IAST> fetch_sign_column = nullptr;
/// tuple in tuple like (a, (b, c)) = (1, (2, 3))
if (const auto * child_func = getAsTuple(child))
fetch_sign_column = std::make_shared<ASTFunction>(*child_func);
else if (const auto * child_ident = child->as<ASTIdentifier>())
fetch_sign_column = std::make_shared<ASTIdentifier>(child_ident->name());
else
return false;
size_t arguments_size = function_node_optional->getArgumentsSize();
ASTPtr fetch_sign_value = std::make_shared<ASTLiteral>(tuple_lit.at(i));
ASTPtr func_node = makeASTFunction("equals", fetch_sign_column, fetch_sign_value);
analyzeImpl(res, func_node, is_final);
}
return true;
}
void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const ASTPtr & node, bool is_final) const
{
const auto * func = node->as<ASTFunction>();
if (func && func->name == "and")
for (size_t i = 0; i < arguments_size; ++i)
{
for (const auto & elem : func->arguments->children)
analyzeImpl(res, elem, is_final);
auto argument = function_node_optional->getArgumentAt(i);
analyzeImpl(res, argument, where_optimizer_context);
}
else if (tryAnalyzeTuple(res, func, is_final))
{
/// analyzed
}
else
{
Condition cond;
cond.node = node;
Condition cond(node);
bool has_invalid_column = false;
collectColumns(node, table_columns, cond.table_columns, has_invalid_column);
collectIdentifiersNoSubqueries(node, cond.identifiers);
cond.columns_size = getIdentifiersColumnSize(cond.identifiers);
cond.columns_size = getColumnsSize(cond.table_columns);
cond.viable =
!has_invalid_column &&
/// Condition depend on some column. Constant expressions are not moved.
!cond.identifiers.empty()
&& !cannotBeMoved(node, is_final)
!cond.table_columns.empty()
&& !cannotBeMoved(node, where_optimizer_context)
/// When use final, do not take into consideration the conditions with non-sorting keys. Because final select
/// need to use all sorting keys, it will cause correctness issues if we filter other columns before final merge.
&& (!is_final || isExpressionOverSortingKey(node))
/// Only table columns are considered. Not array joined columns. NOTE We're assuming that aliases was expanded.
&& isSubsetOfTableColumns(cond.identifiers)
&& (!where_optimizer_context.is_final || isExpressionOverSortingKey(node))
/// Some identifiers can unable to support PREWHERE (usually because of different types in Merge engine)
&& identifiersSupportsPrewhere(cond.identifiers)
&& columnsSupportPrewhere(cond.table_columns)
/// Do not move conditions involving all queried columns.
&& cond.identifiers.size() < queried_columns.size();
&& cond.table_columns.size() < queried_columns.size();
if (cond.viable)
cond.good = isConditionGood(node);
cond.good = isConditionGood(node, table_columns);
res.emplace_back(std::move(cond));
}
}
/// Transform conjunctions chain in WHERE expression to Conditions list.
MergeTreeWhereOptimizer::Conditions MergeTreeWhereOptimizer::analyze(const ASTPtr & expression, bool is_final) const
MergeTreeWhereOptimizer::Conditions MergeTreeWhereOptimizer::analyze(const RPNBuilderTreeNode & node,
const WhereOptimizerContext & where_optimizer_context) const
{
Conditions res;
analyzeImpl(res, expression, is_final);
analyzeImpl(res, node, where_optimizer_context);
return res;
}
/// Transform Conditions list to WHERE or PREWHERE expression.
ASTPtr MergeTreeWhereOptimizer::reconstruct(const Conditions & conditions)
ASTPtr MergeTreeWhereOptimizer::reconstructAST(const Conditions & conditions)
{
if (conditions.empty())
return {};
if (conditions.size() == 1)
return conditions.front().node;
return conditions.front().node.getASTNode()->clone();
const auto function = std::make_shared<ASTFunction>();
@ -234,18 +260,29 @@ ASTPtr MergeTreeWhereOptimizer::reconstruct(const Conditions & conditions)
function->children.push_back(function->arguments);
for (const auto & elem : conditions)
function->arguments->children.push_back(elem.node);
function->arguments->children.push_back(elem.node.getASTNode()->clone());
return function;
}
void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
ActionsDAGPtr MergeTreeWhereOptimizer::reconstructDAG(const Conditions & conditions, const ContextPtr & context)
{
if (!select.where() || select.prewhere())
return;
if (conditions.empty())
return {};
Conditions where_conditions = analyze(select.where(), select.final());
ActionsDAG::NodeRawConstPtrs filter_nodes;
filter_nodes.reserve(conditions.size());
for (const auto & condition : conditions)
filter_nodes.push_back(condition.node.getDAGNode());
return ActionsDAG::buildFilterActionsDAG(filter_nodes, {} /*node_name_to_input_node_column*/, context);
}
std::optional<MergeTreeWhereOptimizer::OptimizeResult> MergeTreeWhereOptimizer::optimizeImpl(const RPNBuilderTreeNode & node,
const WhereOptimizerContext & where_optimizer_context) const
{
Conditions where_conditions = analyze(node, where_optimizer_context);
Conditions prewhere_conditions;
UInt64 total_size_of_moved_conditions = 0;
@ -256,12 +293,12 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
{
prewhere_conditions.splice(prewhere_conditions.end(), where_conditions, cond_it);
total_size_of_moved_conditions += cond_it->columns_size;
total_number_of_moved_columns += cond_it->identifiers.size();
total_number_of_moved_columns += cond_it->table_columns.size();
/// Move all other viable conditions that depend on the same set of columns.
for (auto jt = where_conditions.begin(); jt != where_conditions.end();)
{
if (jt->viable && jt->columns_size == cond_it->columns_size && jt->identifiers == cond_it->identifiers)
if (jt->viable && jt->columns_size == cond_it->columns_size && jt->table_columns == cond_it->table_columns)
prewhere_conditions.splice(prewhere_conditions.end(), where_conditions, jt++);
else
++jt;
@ -278,7 +315,7 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
if (!it->viable)
break;
if (!move_all_conditions_to_prewhere)
if (!where_optimizer_context.move_all_conditions_to_prewhere)
{
bool moved_enough = false;
if (total_size_of_queried_columns > 0)
@ -292,7 +329,7 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
/// Otherwise, use number of moved columns as a fallback.
/// It can happen, if table has only compact parts. 25% ratio is just a guess.
moved_enough = total_number_of_moved_columns > 0
&& (total_number_of_moved_columns + it->identifiers.size()) * 4 > queried_columns.size();
&& (total_number_of_moved_columns + it->table_columns.size()) * 4 > queried_columns.size();
}
if (moved_enough)
@ -304,129 +341,130 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
/// Nothing was moved.
if (prewhere_conditions.empty())
return;
return {};
/// Rewrite the SELECT query.
select.setExpression(ASTSelectQuery::Expression::WHERE, reconstruct(where_conditions));
select.setExpression(ASTSelectQuery::Expression::PREWHERE, reconstruct(prewhere_conditions));
LOG_DEBUG(log, "MergeTreeWhereOptimizer: condition \"{}\" moved to PREWHERE", select.prewhere()->formatForLogging(log_queries_cut_to_length));
OptimizeResult result = {std::move(where_conditions), std::move(prewhere_conditions)};
return result;
}
UInt64 MergeTreeWhereOptimizer::getIdentifiersColumnSize(const NameSet & identifiers) const
UInt64 MergeTreeWhereOptimizer::getColumnsSize(const NameSet & columns) const
{
UInt64 size = 0;
for (const auto & identifier : identifiers)
if (column_sizes.contains(identifier))
size += column_sizes.at(identifier);
for (const auto & column : columns)
if (column_sizes.contains(column))
size += column_sizes.at(column);
return size;
}
bool MergeTreeWhereOptimizer::identifiersSupportsPrewhere(const NameSet & identifiers) const
bool MergeTreeWhereOptimizer::columnsSupportPrewhere(const NameSet & columns) const
{
if (!supported_columns.has_value())
return true;
for (const auto & identifier : identifiers)
if (!supported_columns->contains(identifier))
for (const auto & column : columns)
if (!supported_columns->contains(column))
return false;
return true;
}
bool MergeTreeWhereOptimizer::isExpressionOverSortingKey(const ASTPtr & ast) const
bool MergeTreeWhereOptimizer::isExpressionOverSortingKey(const RPNBuilderTreeNode & node) const
{
if (const auto * func = ast->as<ASTFunction>())
if (node.isFunction())
{
const auto & args = func->arguments->children;
for (const auto & arg : args)
auto function_node = node.toFunctionNode();
size_t arguments_size = function_node.getArgumentsSize();
for (size_t i = 0; i < arguments_size; ++i)
{
if (isConstant(ast) || sorting_key_names.contains(arg->getColumnName()))
auto argument = function_node.getArgumentAt(i);
auto argument_column_name = argument.getColumnName();
if (argument.isConstant() || sorting_key_names.contains(argument_column_name))
continue;
if (!isExpressionOverSortingKey(arg))
if (!isExpressionOverSortingKey(argument))
return false;
}
return true;
}
return isConstant(ast) || sorting_key_names.contains(ast->getColumnName());
return node.isConstant() || sorting_key_names.contains(node.getColumnName());
}
bool MergeTreeWhereOptimizer::isSortingKey(const String & column_name) const
{
return sorting_key_names.contains(column_name);
}
bool MergeTreeWhereOptimizer::isConstant(const ASTPtr & expr) const
bool MergeTreeWhereOptimizer::isSubsetOfTableColumns(const NameSet & columns) const
{
const auto column_name = expr->getColumnName();
return expr->as<ASTLiteral>()
|| (block_with_constants.has(column_name) && isColumnConst(*block_with_constants.getByName(column_name).column));
}
bool MergeTreeWhereOptimizer::isSubsetOfTableColumns(const NameSet & identifiers) const
{
for (const auto & identifier : identifiers)
if (!table_columns.contains(identifier))
for (const auto & column : columns)
if (!table_columns.contains(column))
return false;
return true;
}
bool MergeTreeWhereOptimizer::cannotBeMoved(const ASTPtr & ptr, bool is_final) const
bool MergeTreeWhereOptimizer::cannotBeMoved(const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const
{
if (const auto * function_ptr = ptr->as<ASTFunction>())
if (node.isFunction())
{
auto function_node = node.toFunctionNode();
auto function_name = function_node.getFunctionName();
/// disallow arrayJoin expressions to be moved to PREWHERE for now
if ("arrayJoin" == function_ptr->name)
if (function_name == "arrayJoin")
return true;
/// disallow GLOBAL IN, GLOBAL NOT IN
/// TODO why?
if ("globalIn" == function_ptr->name
|| "globalNotIn" == function_ptr->name)
if (function_name == "globalIn" || function_name == "globalNotIn")
return true;
/// indexHint is a special function that it does not make sense to transfer to PREWHERE
if ("indexHint" == function_ptr->name)
if (function_name == "indexHint")
return true;
}
else if (auto opt_name = IdentifierSemantic::getColumnName(ptr))
{
/// disallow moving result of ARRAY JOIN to PREWHERE
if (array_joined_names.contains(*opt_name) ||
array_joined_names.contains(Nested::extractTableName(*opt_name)) ||
(is_final && !isSortingKey(*opt_name)))
return true;
}
for (const auto & child : ptr->children)
if (cannotBeMoved(child, is_final))
size_t arguments_size = function_node.getArgumentsSize();
for (size_t i = 0; i < arguments_size; ++i)
{
auto argument = function_node.getArgumentAt(i);
if (cannotBeMoved(argument, where_optimizer_context))
return true;
}
}
else
{
auto column_name = node.getColumnName();
/// disallow moving result of ARRAY JOIN to PREWHERE
if (where_optimizer_context.array_joined_names.contains(column_name) ||
where_optimizer_context.array_joined_names.contains(Nested::extractTableName(column_name)) ||
(table_columns.contains(column_name) && where_optimizer_context.is_final && !isSortingKey(column_name)))
return true;
}
return false;
}
void MergeTreeWhereOptimizer::determineArrayJoinedNames(ASTSelectQuery & select)
NameSet MergeTreeWhereOptimizer::determineArrayJoinedNames(const ASTSelectQuery & select)
{
auto [array_join_expression_list, _] = select.arrayJoinExpressionList();
/// much simplified code from ExpressionAnalyzer::getArrayJoinedColumns()
if (!array_join_expression_list)
return;
return {};
NameSet array_joined_names;
for (const auto & ast : array_join_expression_list->children)
array_joined_names.emplace(ast->getAliasOrColumnName());
return array_joined_names;
}
}

View File

@ -3,6 +3,7 @@
#include <Core/Block.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/RPNBuilder.h>
#include <boost/noncopyable.hpp>
@ -34,22 +35,36 @@ class MergeTreeWhereOptimizer : private boost::noncopyable
{
public:
MergeTreeWhereOptimizer(
SelectQueryInfo & query_info,
ContextPtr context,
std::unordered_map<std::string, UInt64> column_sizes_,
const StorageMetadataPtr & metadata_snapshot,
const Names & queried_columns_,
const std::optional<NameSet> & supported_columns_,
Poco::Logger * log_);
private:
void optimize(ASTSelectQuery & select) const;
void optimize(SelectQueryInfo & select_query_info, const ContextPtr & context) const;
struct FilterActionsOptimizeResult
{
ActionsDAGPtr filter_actions;
ActionsDAGPtr prewhere_filter_actions;
};
std::optional<FilterActionsOptimizeResult> optimize(const ActionsDAGPtr & filter_dag,
const std::string & filter_column_name,
const ContextPtr & context,
bool is_final);
private:
struct Condition
{
ASTPtr node;
explicit Condition(RPNBuilderTreeNode node_)
: node(std::move(node_))
{}
RPNBuilderTreeNode node;
UInt64 columns_size = 0;
NameSet identifiers;
NameSet table_columns;
/// Can condition be moved to prewhere?
bool viable = false;
@ -59,7 +74,7 @@ private:
auto tuple() const
{
return std::make_tuple(!viable, !good, columns_size, identifiers.size());
return std::make_tuple(!viable, !good, columns_size, table_columns.size());
}
/// Is condition a better candidate for moving to PREWHERE?
@ -71,27 +86,46 @@ private:
using Conditions = std::list<Condition>;
bool tryAnalyzeTuple(Conditions & res, const ASTFunction * func, bool is_final) const;
void analyzeImpl(Conditions & res, const ASTPtr & node, bool is_final) const;
struct WhereOptimizerContext
{
ContextPtr context;
NameSet array_joined_names;
bool move_all_conditions_to_prewhere = false;
bool is_final = false;
};
struct OptimizeResult
{
Conditions where_conditions;
Conditions prewhere_conditions;
};
std::optional<OptimizeResult> optimizeImpl(const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const;
void analyzeImpl(Conditions & res, const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const;
/// Transform conjunctions chain in WHERE expression to Conditions list.
Conditions analyze(const ASTPtr & expression, bool is_final) const;
Conditions analyze(const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const;
/// Transform Conditions list to WHERE or PREWHERE expression.
static ASTPtr reconstruct(const Conditions & conditions);
/// Reconstruct AST from conditions
static ASTPtr reconstructAST(const Conditions & conditions);
/// Reconstruct DAG from conditions
static ActionsDAGPtr reconstructDAG(const Conditions & conditions, const ContextPtr & context);
void optimizeArbitrary(ASTSelectQuery & select) const;
UInt64 getIdentifiersColumnSize(const NameSet & identifiers) const;
bool identifiersSupportsPrewhere(const NameSet & identifiers) const;
UInt64 getColumnsSize(const NameSet & columns) const;
bool isExpressionOverSortingKey(const ASTPtr & ast) const;
bool columnsSupportPrewhere(const NameSet & columns) const;
bool isExpressionOverSortingKey(const RPNBuilderTreeNode & node) const;
bool isSortingKey(const String & column_name) const;
bool isConstant(const ASTPtr & expr) const;
bool isSubsetOfTableColumns(const NameSet & identifiers) const;
bool isSubsetOfTableColumns(const NameSet & columns) const;
/** ARRAY JOIN'ed columns as well as arrayJoin() result cannot be used in PREWHERE, therefore expressions
* containing said columns should not be moved to PREWHERE at all.
@ -99,23 +133,17 @@ private:
*
* Also, disallow moving expressions with GLOBAL [NOT] IN.
*/
bool cannotBeMoved(const ASTPtr & ptr, bool is_final) const;
bool cannotBeMoved(const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const;
void determineArrayJoinedNames(ASTSelectQuery & select);
static NameSet determineArrayJoinedNames(const ASTSelectQuery & select);
using StringSet = std::unordered_set<std::string>;
const StringSet table_columns;
const NameSet table_columns;
const Names queried_columns;
const std::optional<NameSet> supported_columns;
const NameSet sorting_key_names;
const Block block_with_constants;
Poco::Logger * log;
std::unordered_map<std::string, UInt64> column_sizes;
UInt64 total_size_of_queried_columns = 0;
NameSet array_joined_names;
const bool move_all_conditions_to_prewhere = false;
UInt64 log_queries_cut_to_length = 0;
};

View File

@ -86,6 +86,16 @@ String getColumnNameWithoutAlias(const ActionsDAG::Node & node, bool allow_exper
return std::move(out.str());
}
const ActionsDAG::Node * getNodeWithoutAlias(const ActionsDAG::Node * node)
{
const ActionsDAG::Node * result = node;
while (result->type == ActionsDAG::ActionType::ALIAS)
result = result->children[0];
return result;
}
}
RPNBuilderTreeContext::RPNBuilderTreeContext(ContextPtr query_context_)
@ -137,9 +147,14 @@ std::string RPNBuilderTreeNode::getColumnNameWithModuloLegacy() const
bool RPNBuilderTreeNode::isFunction() const
{
if (ast_node)
{
return typeid_cast<const ASTFunction *>(ast_node);
}
else
return dag_node->type == ActionsDAG::ActionType::FUNCTION;
{
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
return node_without_alias->type == ActionsDAG::ActionType::FUNCTION;
}
}
bool RPNBuilderTreeNode::isConstant() const
@ -160,7 +175,8 @@ bool RPNBuilderTreeNode::isConstant() const
}
else
{
return dag_node->column && isColumnConst(*dag_node->column);
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
return node_without_alias->column && isColumnConst(*node_without_alias->column);
}
}
@ -189,8 +205,9 @@ ColumnWithTypeAndName RPNBuilderTreeNode::getConstantColumn() const
}
else
{
result.type = dag_node->result_type;
result.column = dag_node->column;
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
result.type = node_without_alias->result_type;
result.column = node_without_alias->column;
}
return result;
@ -238,10 +255,12 @@ bool RPNBuilderTreeNode::tryGetConstant(Field & output_value, DataTypePtr & outp
}
else
{
if (dag_node->column && isColumnConst(*dag_node->column))
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
if (node_without_alias->column && isColumnConst(*node_without_alias->column))
{
output_value = (*dag_node->column)[0];
output_type = dag_node->result_type;
output_value = (*node_without_alias->column)[0];
output_type = node_without_alias->result_type;
if (!output_value.isNull())
output_type = removeNullable(output_type);
@ -291,7 +310,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
}
else if (dag_node)
{
return tryGetSetFromDAGNode(dag_node);
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
return tryGetSetFromDAGNode(node_without_alias);
}
return {};
@ -310,7 +330,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types)
}
else if (dag_node)
{
return tryGetSetFromDAGNode(dag_node);
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
return tryGetSetFromDAGNode(node_without_alias);
}
return nullptr;
@ -351,9 +372,11 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
return set;
}
}
else if (dag_node->column)
else
{
return tryGetSetFromDAGNode(dag_node);
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
if (node_without_alias->column)
return tryGetSetFromDAGNode(node_without_alias);
}
return nullptr;
@ -364,10 +387,10 @@ RPNBuilderFunctionTreeNode RPNBuilderTreeNode::toFunctionNode() const
if (!isFunction())
throw Exception(ErrorCodes::LOGICAL_ERROR, "RPNBuilderTree node is not a function");
if (this->ast_node)
return RPNBuilderFunctionTreeNode(this->ast_node, tree_context);
if (ast_node)
return RPNBuilderFunctionTreeNode(ast_node, tree_context);
else
return RPNBuilderFunctionTreeNode(this->dag_node, tree_context);
return RPNBuilderFunctionTreeNode(getNodeWithoutAlias(dag_node), tree_context);
}
std::optional<RPNBuilderFunctionTreeNode> RPNBuilderTreeNode::toFunctionNodeOrNull() const
@ -375,10 +398,10 @@ std::optional<RPNBuilderFunctionTreeNode> RPNBuilderTreeNode::toFunctionNodeOrNu
if (!isFunction())
return {};
if (this->ast_node)
if (ast_node)
return RPNBuilderFunctionTreeNode(this->ast_node, tree_context);
else
return RPNBuilderFunctionTreeNode(this->dag_node, tree_context);
return RPNBuilderFunctionTreeNode(getNodeWithoutAlias(dag_node), tree_context);
}
std::string RPNBuilderFunctionTreeNode::getFunctionName() const

View File

@ -78,6 +78,12 @@ public:
/// Construct RPNBuilderTreeNode with non null ast node and tree context
explicit RPNBuilderTreeNode(const IAST * ast_node_, RPNBuilderTreeContext & tree_context_);
/// Get AST node
const IAST * getASTNode() const { return ast_node; }
/// Get DAG node
const ActionsDAG::Node * getDAGNode() const { return dag_node; }
/// Get column name
std::string getColumnName() const;

View File

@ -1,5 +1,6 @@
-- Tags: no-replicated-database, no-parallel, no-fasttest
SET allow_experimental_analyzer = 1;
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS lv;
@ -8,21 +9,17 @@ DROP TABLE IF EXISTS mt;
CREATE TABLE mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW lv AS SELECT sum(a) AS sum_a FROM mt PREWHERE a > 1;
CREATE LIVE VIEW lv2 AS SELECT sum(number) AS sum_number FROM system.numbers PREWHERE number > 1;
CREATE LIVE VIEW lv2 AS SELECT sum(number) AS sum_number FROM system.numbers PREWHERE number > 1; -- { serverError 182 }
INSERT INTO mt VALUES (1),(2),(3);
SELECT *,_version FROM lv;
SELECT *,_version FROM lv PREWHERE sum_a > 5; -- { serverError 182 }
SELECT *, _version FROM lv;
SELECT *, _version FROM lv PREWHERE sum_a > 5; -- { serverError 182 }
INSERT INTO mt VALUES (1),(2),(3);
SELECT *,_version FROM lv;
SELECT *,_version FROM lv PREWHERE sum_a > 10; -- { serverError 182 }
SELECT *,_version FROM lv2; -- { serverError 182 }
SELECT *,_version FROM lv2 PREWHERE sum_number > 10; -- { serverError 182 }
SELECT *, _version FROM lv;
SELECT *, _version FROM lv PREWHERE sum_a > 10; -- { serverError 182 }
DROP TABLE lv;
DROP TABLE lv2;
DROP TABLE mt;

View File

@ -1,2 +1,2 @@
SELECT * FROM numbers(4) GROUP BY number WITH TOTALS HAVING sum(number) <= arrayJoin([]); -- { serverError 44 }
SELECT * FROM numbers(4) GROUP BY number WITH TOTALS HAVING sum(number) <= arrayJoin([]); -- { serverError 44, 59 }
SELECT * FROM numbers(4) GROUP BY number WITH TOTALS HAVING sum(number) <= arrayJoin([3, 2, 1, 0]) ORDER BY number; -- { serverError 44 }

View File

@ -1,5 +1,5 @@
drop table if exists tab;
create table tab (x UInt64, `arr.a` Array(UInt64), `arr.b` Array(UInt64)) engine = MergeTree order by x;
select x from tab array join arr prewhere x != 0 where arr; -- { serverError 43, 47 }
select x from tab array join arr prewhere arr where x != 0; -- { serverError 43, 47 }
select x from tab array join arr prewhere x != 0 where arr; -- { serverError 47, 59 }
select x from tab array join arr prewhere arr where x != 0; -- { serverError 47, 59 }
drop table if exists tab;

View File

@ -86,8 +86,8 @@ FROM test
GROUP BY
1 + greatest(x1, 1),
x2
select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select max(x1), x2 from test group by 1, 2; -- { serverError 43, 184 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43, 184 }
explain syntax select x1 + x3, x3 from test group by 1, 2;
SELECT
x1 + x3,

View File

@ -30,8 +30,8 @@ explain syntax select x3, if(x3 > 10, x3, plus(x1, x2)), x1 + x2 from test order
explain syntax select max(x1), x2 from test group by 2 order by 1, 2;
explain syntax select 1 + greatest(x1, 1), x2 from test group by 1, 2;
select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select max(x1), x2 from test group by 1, 2; -- { serverError 43, 184 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43, 184 }
explain syntax select x1 + x3, x3 from test group by 1, 2;

View File

@ -1 +1,3 @@
SELECT (NULL, NULL, NULL, NULL, NULL, NULL, NULL) FROM numbers(0) GROUP BY number WITH TOTALS HAVING sum(number) <= arrayJoin([]);
SET allow_experimental_analyzer = 1;
SELECT (NULL, NULL, NULL, NULL, NULL, NULL, NULL) FROM numbers(0) GROUP BY number WITH TOTALS HAVING sum(number) <= arrayJoin([]) -- { serverError 59 };

View File

@ -1,5 +1,6 @@
SET allow_experimental_analyzer = 1;
SET single_join_prefer_left_table = 0;
SET optimize_move_to_prewhere = 0;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table