Added WINDOW functions support

This commit is contained in:
Maksim Kita 2022-09-12 16:14:40 +02:00
parent 85ac02c9db
commit 8cadb1b318
30 changed files with 1619 additions and 200 deletions

View File

@ -1,4 +1,4 @@
#include <Analyzer/CollectAggregateFunctionVisitor.h>
#include <Analyzer/CollectAggregateFunctionNodes.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
@ -30,10 +30,7 @@ public:
static void visit(const QueryTreeNodePtr & node, Data & data)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
return;
if (!function_node->isAggregateFunction())
if (!function_node || !function_node->isAggregateFunction())
return;
if (!data.assert_no_aggregates_place_message.empty())

View File

@ -15,7 +15,7 @@ QueryTreeNodes collectAggregateFunctionNodes(const QueryTreeNodePtr & node);
*/
void collectAggregateFunctionNodes(const QueryTreeNodePtr & node, QueryTreeNodes & result);
/** Assert that there are not aggregate function nodes in node children.
/** Assert that there are no aggregate function nodes in node children.
* Do not visit subqueries.
*/
void assertNoAggregateFunctionNodes(const QueryTreeNodePtr & node, const String & assert_no_aggregates_place_message);

View File

@ -0,0 +1,87 @@
#include <Analyzer/CollectWindowFunctionNodes.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_AGGREGATION;
}
namespace
{
class CollecWindowFunctionNodesMatcher
{
public:
using Visitor = ConstInDepthQueryTreeVisitor<CollecWindowFunctionNodesMatcher, true, false>;
struct Data
{
Data() = default;
String assert_no_window_functions_place_message;
QueryTreeNodes * window_function_nodes = nullptr;
};
static void visit(const QueryTreeNodePtr & node, Data & data)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || !function_node->isWindowFunction())
return;
if (!data.assert_no_window_functions_place_message.empty())
throw Exception(ErrorCodes::ILLEGAL_AGGREGATION,
"Window function {} is found {} in query",
function_node->getName(),
data.assert_no_window_functions_place_message);
if (data.window_function_nodes)
data.window_function_nodes->push_back(node);
}
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
{
return !(child_node->getNodeType() == QueryTreeNodeType::QUERY || child_node->getNodeType() == QueryTreeNodeType::UNION);
}
};
using CollectWindowFunctionNodesVisitor = CollecWindowFunctionNodesMatcher::Visitor;
}
QueryTreeNodes collectWindowFunctionNodes(const QueryTreeNodePtr & node)
{
QueryTreeNodes window_function_nodes;
CollectWindowFunctionNodesVisitor::Data data;
data.window_function_nodes = &window_function_nodes;
CollectWindowFunctionNodesVisitor visitor(data);
visitor.visit(node);
return window_function_nodes;
}
void collectWindowFunctionNodes(const QueryTreeNodePtr & node, QueryTreeNodes & result)
{
CollectWindowFunctionNodesVisitor::Data data;
data.window_function_nodes = &result;
CollectWindowFunctionNodesVisitor visitor(data);
visitor.visit(node);
}
void assertNoWindowFunctionNodes(const QueryTreeNodePtr & node, const String & assert_no_window_functions_place_message)
{
CollectWindowFunctionNodesVisitor::Data data;
data.assert_no_window_functions_place_message = assert_no_window_functions_place_message;
CollectWindowFunctionNodesVisitor(data).visit(node);
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
namespace DB
{
/** Collect window function nodes in node children.
* Do not visit subqueries.
*/
QueryTreeNodes collectWindowFunctionNodes(const QueryTreeNodePtr & node);
/** Collect window function nodes in node children and add them into result.
* Do not visit subqueries.
*/
void collectWindowFunctionNodes(const QueryTreeNodePtr & node, QueryTreeNodes & result);
/** Assert that there are no window function nodes in node children.
* Do not visit subqueries.
*/
void assertNoWindowFunctionNodes(const QueryTreeNodePtr & node, const String & assert_no_window_functions_place_message);
}

View File

@ -6,10 +6,13 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Parsers/ASTFunction.h>
#include <Functions/IFunction.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Parsers/ASTFunction.h>
#include <Analyzer/IdentifierNode.h>
namespace DB
{
@ -17,7 +20,7 @@ namespace DB
FunctionNode::FunctionNode(String function_name_)
: function_name(function_name_)
{
children.resize(2);
children.resize(children_size);
children[parameters_child_index] = std::make_shared<ListNode>();
children[arguments_child_index] = std::make_shared<ListNode>();
}
@ -38,6 +41,11 @@ void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_fun
function_name = aggregate_function->getName();
}
void FunctionNode::resolveAsWindowFunction(AggregateFunctionPtr window_function_value, DataTypePtr result_type_value)
{
resolveAsAggregateFunction(window_function_value, result_type_value);
}
void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const
{
buffer << std::string(indent, ' ') << "FUNCTION id: " << format_state.getNodeId(this);
@ -46,7 +54,14 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << ", alias: " << getAlias();
buffer << ", function_name: " << function_name;
buffer << ", is_aggregate_function: " << isAggregateFunction();
std::string function_type = "ordinary";
if (isAggregateFunction())
function_type = "aggregate";
else if (isWindowFunction())
function_type = "window";
buffer << ", function_type: " << function_type;
if (result_type)
buffer << ", result_type: " + result_type->getName();
@ -70,6 +85,12 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << '\n' << std::string(indent + 2, ' ') << "ARGUMENTS\n";
arguments.dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasWindow())
{
buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n";
getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}
}
String FunctionNode::getName() const
@ -123,6 +144,7 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state) const
hash_state.update(function_name.size());
hash_state.update(function_name);
hash_state.update(isAggregateFunction());
hash_state.update(isWindowFunction());
if (result_type)
{
@ -148,6 +170,16 @@ ASTPtr FunctionNode::toASTImpl() const
auto function_ast = std::make_shared<ASTFunction>();
function_ast->name = function_name;
function_ast->is_window_function = isWindowFunction();
auto window_node = getWindowNode();
if (window_node)
{
if (auto * identifier_node = window_node->as<IdentifierNode>())
function_ast->window_name = identifier_node->getIdentifier().getFullName();
else
function_ast->window_definition = window_node->toAST();
}
const auto & parameters = getParameters();
if (!parameters.getNodes().empty())
@ -166,6 +198,7 @@ ASTPtr FunctionNode::toASTImpl() const
QueryTreeNodePtr FunctionNode::cloneImpl() const
{
auto result_function = std::make_shared<FunctionNode>(function_name);
/// This is valid for clone method function or aggregate function must be stateless
result_function->function = function;
result_function->aggregate_function = aggregate_function;

View File

@ -23,9 +23,12 @@ using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
* Function can be:
* 1. Aggregate function. Example: quantile(0.5)(x), sum(x).
* 2. Non aggregate function. Example: plus(x, x).
* 3. Window function. Example: sum(x) OVER (PARTITION BY expr ORDER BY expr).
*
* Initially function node is initialize with function name.
* During query analysis pass function must be resolved using `resolveAsFunction` or `resolveAsAggregateFunction` methods.
* For window function client must initialize function window node.
*
* During query analysis pass function must be resolved using `resolveAsFunction`, `resolveAsAggregateFunction`, `resolveAsWindowFunction` methods.
* Resolved function is function that has result type and is initialized with concrete aggregate or non aggregate function.
*/
class FunctionNode;
@ -35,7 +38,7 @@ class FunctionNode final : public IQueryTreeNode
{
public:
/** Initialize function node with function name.
* Later during query analysis path function must be resolved.
* Later during query analysis pass function must be resolved.
*/
explicit FunctionNode(String function_name_);
@ -93,6 +96,30 @@ public:
return children[arguments_child_index];
}
/// Has window
bool hasWindow() const
{
return children[window_child_index] != nullptr;
}
/** Get window node.
* Valid only for window function node.
* Can be identifier if window function is defined as expr OVER window_name.
* Or can be window node if window function is defined as expr OVER (window_name ...).
*/
const QueryTreeNodePtr & getWindowNode() const
{
return children[window_child_index];
}
/** Get window node.
* Valid only for window function node.
*/
QueryTreeNodePtr & getWindowNode()
{
return children[window_child_index];
}
/** Get non aggregate function.
* If function is not resolved nullptr returned.
*/
@ -116,10 +143,16 @@ public:
return result_type != nullptr && (function != nullptr || aggregate_function != nullptr);
}
/// Is window function
bool isWindowFunction() const
{
return getWindowNode() != nullptr;
}
/// Is function node resolved as aggregate function
bool isAggregateFunction() const
{
return aggregate_function != nullptr;
return aggregate_function != nullptr && !isWindowFunction();
}
/// Is function node resolved as non aggregate function
@ -142,6 +175,12 @@ public:
*/
void resolveAsAggregateFunction(AggregateFunctionPtr aggregate_function_value, DataTypePtr result_type_value);
/** Resolve function node as window function.
* It is important that function name is update with resolved function name.
* Main motivation for this is query tree optimizations.
*/
void resolveAsWindowFunction(AggregateFunctionPtr window_function_value, DataTypePtr result_type_value);
/// Perform constant folding for function node
void performConstantFolding(ConstantValuePtr constant_folded_value)
{
@ -179,6 +218,8 @@ protected:
private:
static constexpr size_t parameters_child_index = 0;
static constexpr size_t arguments_child_index = 1;
static constexpr size_t window_child_index = 2;
static constexpr size_t children_size = window_child_index + 1;
String function_name;
FunctionOverloadResolverPtr function;

View File

@ -32,6 +32,7 @@ const char * toString(QueryTreeNodeType type)
case QueryTreeNodeType::LAMBDA: return "LAMBDA";
case QueryTreeNodeType::SORT_COLUMN: return "SORT_COLUMN";
case QueryTreeNodeType::INTERPOLATE_COLUMN: return "INTERPOLATE_COLUMN";
case QueryTreeNodeType::WINDOW: return "WINDOW";
case QueryTreeNodeType::TABLE: return "TABLE";
case QueryTreeNodeType::TABLE_FUNCTION: return "TABLE_FUNCTION";
case QueryTreeNodeType::QUERY: return "QUERY";

View File

@ -39,6 +39,7 @@ enum class QueryTreeNodeType
LAMBDA,
SORT_COLUMN,
INTERPOLATE_COLUMN,
WINDOW,
TABLE,
TABLE_FUNCTION,
QUERY,

View File

@ -42,6 +42,7 @@
#include <Analyzer/LambdaNode.h>
#include <Analyzer/SortColumnNode.h>
#include <Analyzer/InterpolateColumnNode.h>
#include <Analyzer/WindowNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/QueryNode.h>
@ -50,7 +51,8 @@
#include <Analyzer/UnionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/CollectAggregateFunctionVisitor.h>
#include <Analyzer/CollectAggregateFunctionNodes.h>
#include <Analyzer/CollectWindowFunctionNodes.h>
#include <Databases/IDatabase.h>
@ -94,6 +96,7 @@ namespace ErrorCodes
extern const int INVALID_LIMIT_EXPRESSION;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int TOO_DEEP_SUBQUERIES;
extern const int UNKNOWN_AGGREGATE_FUNCTION;
extern const int NOT_AN_AGGREGATE;
extern const int ILLEGAL_AGGREGATION;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
@ -190,7 +193,6 @@ namespace ErrorCodes
* TODO: SELECT (compound_expression).*, (compound_expression).COLUMNS are not supported on parser level.
* TODO: SELECT a.b.c.*, a.b.c.COLUMNS. Qualified matcher where identifier size is greater than 2 are not supported on parser level.
* TODO: JOIN support SELF JOIN with MergeTree. JOIN support matchers.
* TODO: WINDOW functions
* TODO: Support function identifier resolve from parent query scope, if lambda in parent scope does not capture any columns.
* TODO: Support group_by_use_nulls
*/
@ -529,6 +531,9 @@ struct IdentifierResolveScope
/// CTE name to query node
std::unordered_map<std::string, QueryTreeNodePtr> cte_name_to_query_node;
/// Window name to window node
std::unordered_map<std::string, QueryTreeNodePtr> window_name_to_window_node;
/// Nodes with duplicated aliases
std::unordered_set<QueryTreeNodePtr> nodes_with_duplicated_aliases;
@ -564,6 +569,33 @@ struct IdentifierResolveScope
*/
QueryTreeNodePtr expression_join_tree_node;
[[maybe_unused]] const IdentifierResolveScope * getNearestQueryScope() const
{
const IdentifierResolveScope * scope_to_check = this;
while (scope_to_check != nullptr)
{
if (scope_to_check->scope_node->getNodeType() == QueryTreeNodeType::QUERY)
break;
scope_to_check = scope_to_check->parent_scope;
}
return scope_to_check;
}
IdentifierResolveScope * getNearestQueryScope()
{
IdentifierResolveScope * scope_to_check = this;
while (scope_to_check != nullptr)
{
if (scope_to_check->scope_node->getNodeType() == QueryTreeNodeType::QUERY)
break;
scope_to_check = scope_to_check->parent_scope;
}
return scope_to_check;
}
TableExpressionData & getTableExpressionDataOrThrow(QueryTreeNodePtr table_expression_node)
{
@ -851,6 +883,8 @@ private:
static void validateTableExpressionModifiers(const QueryTreeNodePtr & table_expression_node, IdentifierResolveScope & scope);
static void mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope);
/// Resolve identifier functions
QueryTreeNodePtr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier);
@ -889,6 +923,8 @@ private:
QueryTreeNodePtr resolveMatcher(QueryTreeNodePtr & matcher_node, IdentifierResolveScope & scope);
void resolveWindow(QueryTreeNodePtr & window_node, IdentifierResolveScope & scope);
void resolveLambda(const QueryTreeNodePtr & lambda_node, const QueryTreeNodes & lambda_arguments, IdentifierResolveScope & scope);
void resolveFunction(QueryTreeNodePtr & function_node, IdentifierResolveScope & scope);
@ -901,6 +937,8 @@ private:
void resolveInterpolateColumnsNodeList(QueryTreeNodePtr & sort_columns_node_list, IdentifierResolveScope & scope);
void resolveWindowNodeList(QueryTreeNodePtr & window_node_list, IdentifierResolveScope & scope);
String calculateProjectionNodeDisplayName(QueryTreeNodePtr & node, IdentifierResolveScope & scope);
NamesAndTypes resolveProjectionExpressionNodeList(QueryTreeNodePtr & projection_node_list, IdentifierResolveScope & scope);
@ -1157,6 +1195,52 @@ void QueryAnalyzer::validateTableExpressionModifiers(const QueryTreeNodePtr & ta
}
}
void QueryAnalyzer::mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope)
{
auto & window_node_typed = window_node->as<WindowNode &>();
auto parent_window_name = window_node_typed.getParentWindowName();
auto & parent_window_node_typed = parent_window_node->as<WindowNode &>();
// If an existing_window_name is specified it must refer to an earlier
// entry in the WINDOW list; the new window copies its partitioning clause
// from that entry, as well as its ordering clause if any. In this case
// the new window cannot specify its own PARTITION BY clause, and it can
// specify ORDER BY only if the copied window does not have one. The new
// window always uses its own frame clause; the copied window must not
// specify a frame clause.
// -- https://www.postgresql.org/docs/current/sql-select.html
if (window_node_typed.hasPartitionBy())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Derived window definition '{}' is not allowed to override PARTITION BY. In scope {}",
window_node_typed.formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
if (window_node_typed.hasOrderBy() && parent_window_node_typed.hasOrderBy())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Derived window definition '{}' is not allowed to override a non-empty ORDER BY. In scope {}",
window_node_typed.formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
if (!parent_window_node_typed.getWindowFrame().is_default)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Parent window '{}' is not allowed to define a frame: while processing derived window definition '{}'. In scope {}",
parent_window_name,
window_node_typed.formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
window_node_typed.getPartitionByNode() = parent_window_node_typed.getPartitionBy().clone();
if (parent_window_node_typed.hasOrderBy())
window_node_typed.getOrderByNode() = parent_window_node_typed.getOrderBy().clone();
}
/// Resolve identifier functions implementation
/// Try resolve table identifier from database catalog
@ -2581,6 +2665,92 @@ QueryTreeNodePtr QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node,
return list;
}
/** Resolve window function window node.
*
* Node can be identifier or window node.
* Example: SELECT count(*) OVER w FROM test_table WINDOW w AS (PARTITION BY id);
* Example: SELECT count(*) OVER (PARTITION BY id);
*
* If node has parent window name specified, then parent window definition is searched in nearest query scope WINDOW section.
* If node is identifier, than node is replaced with window definition.
* If node is window, that window node is merged with parent window node.
*
* Window node PARTITION BY and ORDER BY parts are resolved.
* If window node has frame begin OFFSET or frame end OFFSET specified, they are resolved, and window node frame constants are updated.
* Window node frame is validated.
*/
void QueryAnalyzer::resolveWindow(QueryTreeNodePtr & node, IdentifierResolveScope & scope)
{
std::string parent_window_name;
auto * identifier_node = node->as<IdentifierNode>();
if (identifier_node)
{
parent_window_name = identifier_node->getIdentifier().getFullName();
}
else if (auto * window_node = node->as<WindowNode>())
{
parent_window_name = window_node->getParentWindowName();
}
if (!parent_window_name.empty())
{
auto * nearest_query_scope = scope.getNearestQueryScope();
if (!nearest_query_scope)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Window '{}' does not exists.", parent_window_name);
auto & scope_window_name_to_window_node = nearest_query_scope->window_name_to_window_node;
auto window_node_it = scope_window_name_to_window_node.find(parent_window_name);
if (window_node_it == scope_window_name_to_window_node.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window '{}' does not exists. In scope {}",
parent_window_name,
nearest_query_scope->scope_node->formatASTForErrorMessage());
if (identifier_node)
node = window_node_it->second->clone();
else
mergeWindowWithParentWindow(node, window_node_it->second, scope);
}
auto & window_node = node->as<WindowNode &>();
window_node.setParentWindowName({});
resolveExpressionNodeList(window_node.getPartitionByNode(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
resolveSortColumnsNodeList(window_node.getOrderByNode(), scope);
if (window_node.hasFrameBeginOffset())
{
resolveExpressionNode(window_node.getFrameBeginOffsetNode(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
const auto window_frame_begin_constant_value = window_node.getFrameBeginOffsetNode()->getConstantValueOrNull();
if (!window_frame_begin_constant_value || !isNativeNumber(removeNullable(window_frame_begin_constant_value->getType())))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window frame begin OFFSET expression must be constant with numeric type. Actual {}. In scope {}",
window_node.getFrameBeginOffsetNode()->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
window_node.getWindowFrame().begin_offset = window_frame_begin_constant_value->getValue();
}
if (window_node.hasFrameEndOffset())
{
resolveExpressionNode(window_node.getFrameEndOffsetNode(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
const auto window_frame_end_constant_value = window_node.getFrameEndOffsetNode()->getConstantValueOrNull();
if (!window_frame_end_constant_value || !isNativeNumber(removeNullable(window_frame_end_constant_value->getType())))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window frame begin OFFSET expression must be constant with numeric type. Actual {}. In scope {}",
window_node.getFrameEndOffsetNode()->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
window_node.getWindowFrame().end_offset = window_frame_end_constant_value->getValue();
}
window_node.getWindowFrame().checkValid();
}
/** Resolve lambda function.
* This function modified lambda_node during resolve. It is caller responsibility to clone lambda before resolve
@ -2673,25 +2843,24 @@ void QueryAnalyzer::resolveLambda(const QueryTreeNodePtr & lambda_node, const Qu
*
* Steps:
* 1. Resolve function parameters. Validate that each function parameter must be constant node.
* 2. Resolve function arguments list, lambda expressions are allowed as function arguments.
* 3. Initialize argument_columns, argument_types, function_lambda_arguments_indexes arrays from function arguments.
* 4. Try to resolve function node name identifier as function.
* 5. If function name identifier was not resolved as function, try to lookup lambda from sql user defined functions factory.
* 6. If function was resolve as lambda from step 4, or 5, then resolve lambda using function arguments and replace function node with lambda result.
* 2. Try to lookup function as lambda in current scope. If it is lambda we can skip `in` and `count` special handling.
* 3. If function is count function, that take unqualified ASTERISK matcher, remove it from its arguments. Example: SELECT count(*) FROM test_table;
* 4. If function is `IN` function, then right part of `IN` function is replaced as subquery.
* 5. Resolve function arguments list, lambda expressions are allowed as function arguments.
* For `IN` function table expressions are allowed as function arguments.
* 6. Initialize argument_columns, argument_types, function_lambda_arguments_indexes arrays from function arguments.
* 7. If function name identifier was not resolved as function in current scope, try to lookup lambda from sql user defined functions factory.
* 8. If function was resolve as lambda from step 2 or 7, then resolve lambda using function arguments and replace function node with lambda result.
* After than function node is resolved.
* 7. If function was not resolved during step 6 as lambda, then try to resolve function as executable user defined function or aggregate function or
* non aggregate function.
* 9. If function was not resolved during step 6 as lambda, then try to resolve function as window function or executable user defined function
* or ordinary function or aggregate function.
*
* Special case is `untuple` function that takes single compound argument expression. If argument is not compound expression throw exception.
* Wrap compound expression subcolumns into `tupleElement` and replace function node with them. After that `untuple` function node is resolved.
*
* If function is resolved as executable user defined function or aggregate function, function node is resolved
* If function is resolved as window function or executable user defined function or aggregate function, function node is resolved
* no additional special handling is required.
*
* 8. If function was resolved as non aggregate function. Then if on step 3 there were lambdas, their result types need to be initialized and
* 8. If function was resolved as non aggregate function. Then if some of function arguments are lambda expressions, their result types need to be initialized and
* they must be resolved.
* 9. If function is suitable for constant folding, try to replace function node with constant result.
*
* 9. If function is suitable for constant folding, try to perform constant folding for function node.
*/
void QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, IdentifierResolveScope & scope)
{
@ -2726,7 +2895,23 @@ void QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, IdentifierResolveSc
parameters.push_back(constant_value->getValue());
}
bool is_special_function_in = isNameOfInFunction(function_name);
//// If function node is not window function try to lookup function node name as lambda identifier.
QueryTreeNodePtr lambda_expression_untyped;
if (!function_node.isWindowFunction())
{
auto function_lookup_result = tryResolveIdentifier({Identifier{function_node.getFunctionName()}, IdentifierLookupContext::FUNCTION}, scope);
lambda_expression_untyped = function_lookup_result.resolved_identifier;
}
bool is_special_function_in = false;
if (!lambda_expression_untyped)
{
is_special_function_in = isNameOfInFunction(function_name);
/// Handle SELECT count(*) FROM test_table
if (function_name == "count")
function_node.getArguments().getNodes().clear();
}
/// Resolve function arguments
@ -2844,105 +3029,125 @@ void QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, IdentifierResolveSc
argument_columns.emplace_back(std::move(argument_column));
}
/** Lookup function node name as lambda identifier.
* If no lambda node exists with function node name identifier, try to resolve it as lambda from sql user defined functions.
/** Try to resolve function as
* 1. Lambda function in current scope. Example: WITH (x -> x + 1) AS lambda SELECT lambda(1);
* 2. Lambda function from sql user defined functions.
* 3. Special `untuple` function.
* 4. Special `grouping` function.
* 5. Window function.
* 6. Executable user defined function.
* 7. Ordinary function.
* 8. Aggregate function.
*
* TODO: Provide better error hints.
*/
auto function_lookup_result = tryResolveIdentifier({Identifier{function_node.getFunctionName()}, IdentifierLookupContext::FUNCTION}, scope);
auto lambda_expression_untyped = function_lookup_result.resolved_identifier;
if (!lambda_expression_untyped)
lambda_expression_untyped = tryGetLambdaFromSQLUserDefinedFunction(function_node.getFunctionName());
/** If function is resolved as lambda.
* Clone lambda before resolve.
* Initialize lambda arguments as function arguments
* Resolve lambda and then replace function node with resolved lambda expression body.
* Example: WITH (x -> x + 1) AS lambda SELECT lambda(value) FROM test_table;
* Result: SELECT value + 1 FROM test_table;
*/
if (lambda_expression_untyped)
if (!function_node.isWindowFunction())
{
auto * lambda_expression = lambda_expression_untyped->as<LambdaNode>();
if (!lambda_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function identifier {} must be resolved as lambda. Actual {}. In scope {}",
function_node.getFunctionName(),
lambda_expression_untyped->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
if (!lambda_expression_untyped)
lambda_expression_untyped = tryGetLambdaFromSQLUserDefinedFunction(function_node.getFunctionName());
auto lambda_expression_clone = lambda_expression_untyped->clone();
IdentifierResolveScope lambda_scope(lambda_expression_clone, &scope /*parent_scope*/);
resolveLambda(lambda_expression_clone, function_arguments, lambda_scope);
auto & resolved_lambda = lambda_expression_clone->as<LambdaNode &>();
node = resolved_lambda.getExpression();
return;
}
if (function_name == "untuple")
{
/// Special handling of `untuple` function
if (function_arguments.size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function 'untuple' must have 1 argument. In scope {}",
scope.scope_node->formatASTForErrorMessage());
const auto & tuple_argument = function_arguments[0];
auto result_type = tuple_argument->getResultType();
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
if (!tuple_data_type)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function untuple argument must be have compound type. Actual type {}. In scope {}",
result_type->getName(),
scope.scope_node->formatASTForErrorMessage());
const auto & element_names = tuple_data_type->getElementNames();
auto result_list = std::make_shared<ListNode>();
result_list->getNodes().reserve(element_names.size());
for (const auto & element_name : element_names)
/** If function is resolved as lambda.
* Clone lambda before resolve.
* Initialize lambda arguments as function arguments.
* Resolve lambda and then replace function node with resolved lambda expression body.
* Example: WITH (x -> x + 1) AS lambda SELECT lambda(value) FROM test_table;
* Result: SELECT value + 1 FROM test_table;
*/
if (lambda_expression_untyped)
{
auto tuple_element_function = std::make_shared<FunctionNode>("tupleElement");
tuple_element_function->getArguments().getNodes().push_back(tuple_argument);
tuple_element_function->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(element_name));
auto * lambda_expression = lambda_expression_untyped->as<LambdaNode>();
if (!lambda_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function identifier {} must be resolved as lambda. Actual {}. In scope {}",
function_node.getFunctionName(),
lambda_expression_untyped->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
QueryTreeNodePtr function_query_node = tuple_element_function;
resolveFunction(function_query_node, scope);
auto lambda_expression_clone = lambda_expression_untyped->clone();
if (scope.projection_names_calculation_stage && node->hasAlias())
scope.node_to_projection_name.emplace(function_query_node, node->getAlias() + '.' + element_name);
IdentifierResolveScope lambda_scope(lambda_expression_clone, &scope /*parent_scope*/);
resolveLambda(lambda_expression_clone, function_arguments, lambda_scope);
result_list->getNodes().push_back(std::move(function_query_node));
auto & resolved_lambda = lambda_expression_clone->as<LambdaNode &>();
node = resolved_lambda.getExpression();
return;
}
node = result_list;
return;
if (function_name == "untuple")
{
/// Special handling of `untuple` function
if (function_arguments.size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function 'untuple' must have 1 argument. In scope {}",
scope.scope_node->formatASTForErrorMessage());
const auto & tuple_argument = function_arguments[0];
auto result_type = tuple_argument->getResultType();
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
if (!tuple_data_type)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function untuple argument must be have compound type. Actual type {}. In scope {}",
result_type->getName(),
scope.scope_node->formatASTForErrorMessage());
const auto & element_names = tuple_data_type->getElementNames();
auto result_list = std::make_shared<ListNode>();
result_list->getNodes().reserve(element_names.size());
for (const auto & element_name : element_names)
{
auto tuple_element_function = std::make_shared<FunctionNode>("tupleElement");
tuple_element_function->getArguments().getNodes().push_back(tuple_argument);
tuple_element_function->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(element_name));
QueryTreeNodePtr function_query_node = tuple_element_function;
resolveFunction(function_query_node, scope);
if (scope.projection_names_calculation_stage && node->hasAlias())
scope.node_to_projection_name.emplace(function_query_node, node->getAlias() + '.' + element_name);
result_list->getNodes().push_back(std::move(function_query_node));
}
node = result_list;
return;
}
else if (function_name == "grouping")
{
/// It is responsibility of planner to perform additional handling of grouping function
if (function_arguments_size == 0)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
"Function GROUPING expects at least one argument");
else if (function_arguments_size > 64)
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Function GROUPING can have up to 64 arguments, but {} provided",
function_arguments_size);
auto grouping_function = std::make_shared<FunctionGrouping>();
auto grouping_function_adaptor = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_function));
function_node.resolveAsFunction(std::move(grouping_function_adaptor), std::make_shared<DataTypeUInt64>());
return;
}
}
else if (function_name == "grouping")
if (function_node.isWindowFunction())
{
/// It is responsibility of planner to perform additional handling of grouping function
if (function_arguments_size == 0)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
"Function GROUPING expects at least one argument");
else if (function_arguments_size > 64)
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Function GROUPING can have up to 64 arguments, but {} provided",
function_arguments_size);
if (!AggregateFunctionFactory::instance().isAggregateFunctionName(function_name))
throw Exception(ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION,
"Aggregate function with name {} does not exists. In scope {}",
function_name,
scope.scope_node->formatASTForErrorMessage());
auto grouping_function = std::make_shared<FunctionGrouping>();
auto grouping_function_adaptor = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_function));
function_node.resolveAsFunction(std::move(grouping_function_adaptor), std::make_shared<DataTypeUInt64>());
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get(function_name, argument_types, parameters, properties);
function_node.resolveAsWindowFunction(aggregate_function, aggregate_function->getReturnType());
resolveWindow(function_node.getWindowNode(), scope);
return;
}
/** Try to resolve function as
* 1. Executable user defined function.
* 2. Aggregate function.
* 3. Non aggregate function.
* TODO: Provide better hints.
*/
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, context, parameters);
if (!function)
@ -2951,7 +3156,7 @@ void QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, IdentifierResolveSc
if (!function)
{
if (!AggregateFunctionFactory::instance().isAggregateFunctionName(function_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
throw Exception(ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION,
"Function with name {} does not exists. In scope {}",
function_name,
scope.scope_node->formatASTForErrorMessage());
@ -3287,6 +3492,13 @@ void QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, IdentifierRes
node->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
case QueryTreeNodeType::WINDOW:
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Window {} is not allowed in expression. In scope {}",
node->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
case QueryTreeNodeType::TABLE:
{
if (!allow_table_expression)
@ -3467,6 +3679,15 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & sort_co
}
}
/** Resolve window nodes list.
*/
void QueryAnalyzer::resolveWindowNodeList(QueryTreeNodePtr & window_node_list, IdentifierResolveScope & scope)
{
auto & window_node_list_typed = window_node_list->as<ListNode &>();
for (auto & node : window_node_list_typed.getNodes())
resolveWindow(node, scope);
}
class SubqueryToProjectionNameMatcher
{
public:
@ -4436,10 +4657,10 @@ void assertNoGroupingFunction(const QueryTreeNodePtr & node, const String & asse
* 3. Resolve FROM section.
* 4. Resolve projection columns.
* 5. Resolve expressions in other query parts.
* 6. Remove WITH section from query.
* 7. Validate nodes with duplicate aliases.
* 8. Remove aliases from expression and lambda nodes.
* 9. Validate aggregates.
* 6. Validate nodes with duplicate aliases.
* 7. Validate aggregates, aggregate functions, GROUPING function, window functions.
* 8. Remove WITH and WINDOW sections from query.
* 9. Remove aliases from expression and lambda nodes.
* 10. Resolve query tree node with projection columns.
*/
void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, IdentifierResolveScope & scope)
@ -4475,6 +4696,9 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasHaving())
visitor.visit(query_node_typed.getHaving());
if (query_node_typed.hasWindow())
visitor.visit(query_node_typed.getWindowNode());
if (query_node_typed.hasOrderBy())
visitor.visit(query_node_typed.getOrderByNode());
@ -4521,6 +4745,26 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
return subquery_node && subquery_node->isCTE();
});
for (auto & window_node : query_node_typed.getWindow().getNodes())
{
auto & window_node_typed = window_node->as<WindowNode &>();
auto parent_window_name = window_node_typed.getParentWindowName();
if (!parent_window_name.empty())
{
auto window_node_it = scope.window_name_to_window_node.find(parent_window_name);
if (window_node_it == scope.window_name_to_window_node.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window '{}' does not exists. In scope {}",
parent_window_name,
scope.scope_node->formatASTForErrorMessage());
mergeWindowWithParentWindow(window_node, window_node_it->second, scope);
window_node_typed.setParentWindowName({});
}
scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
}
/** Disable identifier cache during JOIN TREE resolve.
* Depending on JOIN expression section, identifier with same name
* can be resolved in different columns.
@ -4579,6 +4823,9 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasHaving())
resolveExpressionNode(query_node_typed.getHaving(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (query_node_typed.hasWindow())
resolveWindowNodeList(query_node_typed.getWindowNode(), scope);
if (query_node_typed.hasOrderBy())
resolveSortColumnsNodeList(query_node_typed.getOrderByNode(), scope);
@ -4612,13 +4859,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
validateLimitOffsetExpression(query_node_typed.getOffset(), "OFFSET", scope);
}
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
* and CTE for other sections to use.
*
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
*/
query_node_typed.getWithNode() = std::make_shared<ListNode>();
/** Resolve nodes with duplicate aliases.
* Table expressions cannot have duplicate aliases.
*
@ -4672,51 +4912,56 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
node->removeAlias();
}
/// Remove aliases from expression and lambda nodes
for (auto & [_, node] : scope.alias_name_to_expression_node)
node->removeAlias();
for (auto & [_, node] : scope.alias_name_to_lambda_node)
node->removeAlias();
/** Validate aggregates
*
* 1. Check that there are no aggregate functions in WHERE.
* 2. Check that there are no aggregate functions in PREWHERE.
* 3. Check that there are no aggregate functions in another aggregate functions.
* 4. Check that there are no columns that are not specified in GROUP BY keys.
* 5. Validate GROUP BY modifiers.
* 1. Check that there are no aggregate functions and GROUPING function in WHERE, in PREWHERE, in another aggregate functions.
* 2. Check that there are no window functions in WHERE, in PREWHERE, in HAVING, in WINDOW, inside another aggregate function,
* inside window function arguments, inside window function window definition.
* 3. Check that there are no columns that are not specified in GROUP BY keys.
* 4. Validate GROUP BY modifiers.
*/
if (query_node_typed.hasWhere())
{
assertNoAggregateFunctionNodes(query_node_typed.getWhere(), "in WHERE");
assertNoGroupingFunction(query_node_typed.getWhere(), "in WHERE");
assertNoWindowFunctionNodes(query_node_typed.getWhere(), "in WHERE");
}
if (query_node_typed.hasPrewhere())
{
assertNoAggregateFunctionNodes(query_node_typed.getPrewhere(), "in PREWHERE");
assertNoGroupingFunction(query_node_typed.getPrewhere(), "in PREWHERE");
assertNoWindowFunctionNodes(query_node_typed.getPrewhere(), "in PREWHERE");
}
QueryTreeNodes aggregate_function_nodes;
if (query_node_typed.hasHaving())
collectAggregateFunctionNodes(query_node_typed.getHaving(), aggregate_function_nodes);
assertNoWindowFunctionNodes(query_node_typed.getHaving(), "in HAVING");
if (query_node_typed.hasOrderBy())
collectAggregateFunctionNodes(query_node_typed.getOrderByNode(), aggregate_function_nodes);
if (query_node_typed.hasWindow())
assertNoWindowFunctionNodes(query_node_typed.getWindowNode(), "in WINDOW");
collectAggregateFunctionNodes(query_node_typed.getProjectionNode(), aggregate_function_nodes);
QueryTreeNodes aggregate_function_nodes;
QueryTreeNodes window_function_nodes;
collectAggregateFunctionNodes(query_node, aggregate_function_nodes);
collectWindowFunctionNodes(query_node, window_function_nodes);
for (auto & aggregate_function_node : aggregate_function_nodes)
{
for (auto & aggregate_function_node_child : aggregate_function_node->getChildren())
{
assertNoAggregateFunctionNodes(aggregate_function_node_child, "inside another aggregate function");
assertNoGroupingFunction(aggregate_function_node_child, "inside another aggregate function");
}
auto & aggregate_function_node_typed = aggregate_function_node->as<FunctionNode &>();
assertNoAggregateFunctionNodes(aggregate_function_node_typed.getArgumentsNode(), "inside another aggregate function");
assertNoGroupingFunction(aggregate_function_node_typed.getArgumentsNode(), "inside another aggregate function");
assertNoWindowFunctionNodes(aggregate_function_node_typed.getArgumentsNode(), "inside an aggregate function");
}
for (auto & window_function_node : window_function_nodes)
{
auto & window_function_node_typed = window_function_node->as<FunctionNode &>();
assertNoWindowFunctionNodes(window_function_node_typed.getArgumentsNode(), "inside another window function");
if (query_node_typed.hasWindow())
assertNoWindowFunctionNodes(window_function_node_typed.getWindowNode(), "inside window definition");
}
QueryTreeNodes group_by_keys_nodes;
@ -4793,6 +5038,27 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasHaving() && query_node_typed.isGroupByWithTotals() && is_rollup_or_cube)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
* and CTE for other sections to use.
*
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
*/
query_node_typed.getWith().getNodes().clear();
/** WINDOW section can be safely removed, because WINDOW section can only provide window definition to window functions.
*
* Example: SELECT count(*) OVER w FROM test_table WINDOW w AS (PARTITION BY id);
*/
query_node_typed.getWindow().getNodes().clear();
/// Remove aliases from expression and lambda nodes
for (auto & [_, node] : scope.alias_name_to_expression_node)
node->removeAlias();
for (auto & [_, node] : scope.alias_name_to_lambda_node)
node->removeAlias();
query_node_typed.resolveProjectionColumns(std::move(projection_columns));
}

View File

@ -33,6 +33,7 @@ namespace DB
*
* Constness of function parameters.
* Constness of LIMIT and OFFSET.
* Window functions frame. Constness of window functions frame begin OFFSET, end OFFSET.
* In SELECT, ORDER BY only columns that are specified in GROUP BY keys after GROUP BY are used.
* GROUPING function arguments are specified in GROUP BY keys.
* No GROUPING function if there is no GROUP BY.

View File

@ -28,6 +28,7 @@ QueryNode::QueryNode()
children[with_child_index] = std::make_shared<ListNode>();
children[projection_child_index] = std::make_shared<ListNode>();
children[group_by_child_index] = std::make_shared<ListNode>();
children[window_child_index] = std::make_shared<ListNode>();
children[order_by_child_index] = std::make_shared<ListNode>();
children[limit_by_child_index] = std::make_shared<ListNode>();
}
@ -75,6 +76,12 @@ String QueryNode::getName() const
buffer << getHaving()->getName();
}
if (hasWindow())
{
buffer << " WINDOW ";
buffer << getWindow().getName();
}
if (hasOrderBy())
{
buffer << " ORDER BY ";
@ -132,9 +139,16 @@ void QueryNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, s
buffer << ", is_distinct: " << is_distinct;
buffer << ", is_limit_with_ties: " << is_limit_with_ties;
buffer << ", is_group_by_with_totals: " << is_group_by_with_totals;
buffer << ", is_group_by_with_rollup: " << is_group_by_with_rollup;
buffer << ", is_group_by_with_cube: " << is_group_by_with_cube;
buffer << ", is_group_by_with_grouping_sets: " << is_group_by_with_grouping_sets;
std::string group_by_type = "ordinary";
if (is_group_by_with_rollup)
group_by_type = "rollup";
else if (is_group_by_with_cube)
group_by_type = "cube";
else if (is_group_by_with_grouping_sets)
group_by_type = "grouping_sets";
buffer << ", group_by_type: " << group_by_type;
if (!cte_name.empty())
buffer << ", cte_name: " << cte_name;
@ -206,6 +220,12 @@ void QueryNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, s
getHaving()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasWindow())
{
buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n";
getWindow().dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasOrderBy())
{
buffer << '\n' << std::string(indent + 2, ' ') << "ORDER BY\n";
@ -339,6 +359,9 @@ ASTPtr QueryNode::toASTImpl() const
if (hasHaving())
select_query->setExpression(ASTSelectQuery::Expression::HAVING, getHaving()->toAST());
if (hasWindow())
select_query->setExpression(ASTSelectQuery::Expression::WINDOW, getWindow().toAST());
if (hasOrderBy())
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, getOrderBy().toAST());

View File

@ -189,6 +189,31 @@ public:
return children[join_tree_child_index];
}
bool hasWindow() const
{
return !getWindow().getNodes().empty();
}
const ListNode & getWindow() const
{
return children[window_child_index]->as<const ListNode &>();
}
ListNode & getWindow()
{
return children[window_child_index]->as<ListNode &>();
}
const QueryTreeNodePtr & getWindowNode() const
{
return children[window_child_index];
}
QueryTreeNodePtr & getWindowNode()
{
return children[window_child_index];
}
bool hasPrewhere() const
{
return children[prewhere_child_index] != nullptr;
@ -453,13 +478,14 @@ private:
static constexpr size_t where_child_index = 4;
static constexpr size_t group_by_child_index = 5;
static constexpr size_t having_child_index = 6;
static constexpr size_t order_by_child_index = 7;
static constexpr size_t interpolate_child_index = 8;
static constexpr size_t limit_by_limit_child_index = 9;
static constexpr size_t limit_by_offset_child_index = 10;
static constexpr size_t limit_by_child_index = 11;
static constexpr size_t limit_child_index = 12;
static constexpr size_t offset_child_index = 13;
static constexpr size_t window_child_index = 7;
static constexpr size_t order_by_child_index = 8;
static constexpr size_t interpolate_child_index = 9;
static constexpr size_t limit_by_limit_child_index = 10;
static constexpr size_t limit_by_offset_child_index = 11;
static constexpr size_t limit_by_child_index = 12;
static constexpr size_t limit_child_index = 13;
static constexpr size_t offset_child_index = 14;
static constexpr size_t children_size = offset_child_index + 1;
};

View File

@ -24,6 +24,7 @@
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTInterpolateElement.h>
#include <Parsers/ASTSampleRatio.h>
#include <Parsers/ASTWindowDefinition.h>
#include <Analyzer/IdentifierNode.h>
#include <Analyzer/MatcherNode.h>
@ -34,6 +35,7 @@
#include <Analyzer/LambdaNode.h>
#include <Analyzer/SortColumnNode.h>
#include <Analyzer/InterpolateColumnNode.h>
#include <Analyzer/WindowNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/QueryNode.h>
@ -84,10 +86,14 @@ private:
QueryTreeNodePtr buildInterpolateColumnList(const ASTPtr & interpolate_expression_list) const;
QueryTreeNodePtr buildWindowList(const ASTPtr & window_definition_list) const;
QueryTreeNodePtr buildExpressionList(const ASTPtr & expression_list) const;
QueryTreeNodePtr buildExpression(const ASTPtr & expression) const;
QueryTreeNodePtr buildWindow(const ASTPtr & window_definition) const;
QueryTreeNodePtr buildJoinTree(const ASTPtr & tables_in_select_query) const;
ColumnTransformersNodes buildColumnTransformers(const ASTPtr & matcher_expression, size_t start_child_index) const;
@ -250,6 +256,10 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
if (having_expression)
current_query_tree->getHaving() = buildExpression(having_expression);
auto window_list = select_query_typed.window();
if (window_list)
current_query_tree->getWindowNode() = buildWindowList(window_list);
auto select_order_by_list = select_query_typed.orderBy();
if (select_order_by_list)
current_query_tree->getOrderByNode() = buildSortColumnList(select_order_by_list);
@ -342,6 +352,26 @@ QueryTreeNodePtr QueryTreeBuilder::buildInterpolateColumnList(const ASTPtr & int
return list_node;
}
QueryTreeNodePtr QueryTreeBuilder::buildWindowList(const ASTPtr & window_definition_list) const
{
auto list_node = std::make_shared<ListNode>();
auto & expression_list_typed = window_definition_list->as<ASTExpressionList &>();
list_node->getNodes().reserve(expression_list_typed.children.size());
for (auto & window_list_element : expression_list_typed.children)
{
const auto & window_list_element_typed = window_list_element->as<const ASTWindowListElement &>();
auto window_node = buildWindow(window_list_element_typed.definition);
window_node->setAlias(window_list_element_typed.name);
list_node->getNodes().push_back(std::move(window_node));
}
return list_node;
}
QueryTreeNodePtr QueryTreeBuilder::buildExpressionList(const ASTPtr & expression_list) const
{
auto list_node = std::make_shared<ListNode>();
@ -451,6 +481,14 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression) co
function_node->getArguments().getNodes().push_back(buildExpression(argument));
}
if (function->is_window_function)
{
if (function->window_definition)
function_node->getWindowNode() = buildWindow(function->window_definition);
else
function_node->getWindowNode() = std::make_shared<IdentifierNode>(Identifier(function->window_name));
}
result = function_node;
}
}
@ -522,6 +560,41 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression) co
return result;
}
QueryTreeNodePtr QueryTreeBuilder::buildWindow(const ASTPtr & window_definition) const
{
const auto & window_definition_typed = window_definition->as<const ASTWindowDefinition &>();
WindowFrame window_frame;
if (!window_definition_typed.frame_is_default)
{
window_frame.is_default = false;
window_frame.type = window_definition_typed.frame_type;
window_frame.begin_type = window_definition_typed.frame_begin_type;
window_frame.begin_preceding = window_definition_typed.frame_begin_preceding;
window_frame.end_type = window_definition_typed.frame_end_type;
window_frame.end_preceding = window_definition_typed.frame_end_preceding;
}
auto window_node = std::make_shared<WindowNode>(window_frame);
window_node->setParentWindowName(window_definition_typed.parent_window_name);
if (window_definition_typed.partition_by)
window_node->getPartitionByNode() = buildExpressionList(window_definition_typed.partition_by);
if (window_definition_typed.order_by)
window_node->getOrderByNode() = buildSortColumnList(window_definition_typed.order_by);
if (window_definition_typed.frame_begin_offset)
window_node->getFrameBeginOffsetNode() = buildExpression(window_definition_typed.frame_begin_offset);
if (window_definition_typed.frame_end_offset)
window_node->getFrameEndOffsetNode() = buildExpression(window_definition_typed.frame_end_offset);
window_node->setOriginalAST(window_definition);
return window_node;
}
QueryTreeNodePtr QueryTreeBuilder::buildJoinTree(const ASTPtr & tables_in_select_query) const
{
if (!tables_in_select_query)

207
src/Analyzer/WindowNode.cpp Normal file
View File

@ -0,0 +1,207 @@
#include <Analyzer/WindowNode.h>
#include <Common/SipHash.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Parsers/ASTWindowDefinition.h>
namespace DB
{
WindowNode::WindowNode(WindowFrame window_frame_)
: window_frame(std::move(window_frame_))
{
children.resize(children_size);
children[partition_by_child_index] = std::make_shared<ListNode>();
children[order_by_child_index] = std::make_shared<ListNode>();
}
String WindowNode::getName() const
{
String result;
if (hasPartitionBy())
{
result += "PARTITION BY";
result += getPartitionBy().getName();
}
if (hasOrderBy())
{
result += "ORDER BY";
result += getOrderBy().getName();
}
if (!window_frame.is_default)
{
if (hasPartitionBy() || hasOrderBy())
result += ' ';
if (window_frame.type == WindowFrame::FrameType::ROWS)
result += "ROWS";
else if (window_frame.type == WindowFrame::FrameType::GROUPS)
result += "GROUPS";
else if (window_frame.type == WindowFrame::FrameType::RANGE)
result += "RANGE";
result += " BETWEEN ";
if (window_frame.begin_type == WindowFrame::BoundaryType::Current)
{
result += "CURRENT ROW";
}
else if (window_frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
result += "UNBOUNDED";
result += " ";
result += (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
result += getFrameBeginOffsetNode()->getName();
result += " ";
result += (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
result += " AND ";
if (window_frame.end_type == WindowFrame::BoundaryType::Current)
{
result += "CURRENT ROW";
}
else if (window_frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
result += "UNBOUNDED";
result += " ";
result += (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
result += getFrameEndOffsetNode()->getName();
result += " ";
result += (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
}
return result;
}
void WindowNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const
{
buffer << std::string(indent, ' ') << "WINDOW id: " << format_state.getNodeId(this);
if (hasAlias())
buffer << ", alias: " << getAlias();
buffer << ", parent_window_name: " << parent_window_name;
buffer << ", frame_type: " << window_frame.type;
auto window_frame_bound_type_to_string = [](WindowFrame::BoundaryType boundary_type, bool boundary_preceding)
{
std::string value;
if (boundary_type == WindowFrame::BoundaryType::Unbounded)
value = "unbounded";
else if (boundary_type == WindowFrame::BoundaryType::Current)
value = "current";
else if (boundary_type == WindowFrame::BoundaryType::Offset)
value = "offset";
if (boundary_type != WindowFrame::BoundaryType::Current)
{
if (boundary_preceding)
value += " preceding";
else
value += " following";
}
return value;
};
buffer << ", frame_begin_type: " << window_frame_bound_type_to_string(window_frame.begin_type, window_frame.begin_preceding);
buffer << ", frame_end_type: " << window_frame_bound_type_to_string(window_frame.end_type, window_frame.end_preceding);
if (hasPartitionBy())
{
buffer << '\n' << std::string(indent + 2, ' ') << "PARTITION BY\n";
getPartitionBy().dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasOrderBy())
{
buffer << '\n' << std::string(indent + 2, ' ') << "ORDER BY\n";
getOrderBy().dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasFrameBeginOffset())
{
buffer << '\n' << std::string(indent + 2, ' ') << "FRAME BEGIN OFFSET\n";
getFrameBeginOffsetNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasFrameEndOffset())
{
buffer << '\n' << std::string(indent + 2, ' ') << "FRAME END OFFSET\n";
getFrameEndOffsetNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}
}
bool WindowNode::isEqualImpl(const IQueryTreeNode & rhs) const
{
const auto & rhs_typed = assert_cast<const WindowNode &>(rhs);
return window_frame == rhs_typed.window_frame && parent_window_name == rhs_typed.parent_window_name;
}
void WindowNode::updateTreeHashImpl(HashState & hash_state) const
{
hash_state.update(window_frame.is_default);
hash_state.update(window_frame.type);
hash_state.update(window_frame.begin_type);
hash_state.update(window_frame.begin_preceding);
hash_state.update(window_frame.end_type);
hash_state.update(window_frame.end_preceding);
}
ASTPtr WindowNode::toASTImpl() const
{
auto window_definition = std::make_shared<ASTWindowDefinition>();
window_definition->children.push_back(getPartitionByNode()->toAST());
window_definition->partition_by = window_definition->children.back();
window_definition->children.push_back(getOrderByNode()->toAST());
window_definition->order_by = window_definition->children.back();
window_definition->frame_is_default = window_frame.is_default;
window_definition->frame_type = window_frame.type;
window_definition->frame_begin_type = window_frame.begin_type;
window_definition->frame_begin_preceding = window_frame.begin_preceding;
if (hasFrameBeginOffset())
{
window_definition->children.push_back(getFrameBeginOffsetNode()->toAST());
window_definition->frame_begin_offset = window_definition->children.back();
}
window_definition->frame_end_type = window_frame.end_type;
window_definition->frame_end_preceding = window_frame.end_preceding;
if (hasFrameEndOffset())
{
window_definition->children.push_back(getFrameEndOffsetNode()->toAST());
window_definition->frame_end_offset = window_definition->children.back();
}
return window_definition;
}
QueryTreeNodePtr WindowNode::cloneImpl() const
{
auto window_node = std::make_shared<WindowNode>(window_frame);
window_node->parent_window_name = parent_window_name;
return window_node;
}
}

193
src/Analyzer/WindowNode.h Normal file
View File

@ -0,0 +1,193 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/ListNode.h>
#include <Interpreters/WindowDescription.h>
namespace DB
{
/** Window node represents window function window description.
*
* Example: SELECT * FROM test_table WINDOW window AS (PARTITION BY id);
* window AS (PARTITION BY id) - window node.
*
* Example: SELECT count() OVER (PARTITION BY id) FROM test_table;
* PARTITION BY id - window node.
*
* Window node can also refer to its parent window node.
* Example: SELECT count() OVER (parent_window ORDER BY id) FROM test_table WINDOW parent_window AS (PARTITION BY id);
* parent_window ORDER BY id - window node.
*
* Window node initially initialized with window frame.
*
* If window frame has OFFSET begin type, additionally frame begin offset node must be initialized.
* If window frame has OFFSET end type, additionally frame end offset node must be initialized.
* During query analysis pass they must be resolved, validated and window node window frame offset constants must be updated.
*/
class WindowNode;
using WindowNodePtr = std::shared_ptr<WindowNode>;
class WindowNode final : public IQueryTreeNode
{
public:
/// Initialize window node with window frame
explicit WindowNode(WindowFrame window_frame_);
/// Get window node window frame
const WindowFrame & getWindowFrame() const
{
return window_frame;
}
/// Get window node window frame
WindowFrame & getWindowFrame()
{
return window_frame;
}
/// Has parent window name
bool hasParentWindowName() const
{
return parent_window_name.empty();
}
/// Get parent window name
const String & getParentWindowName() const
{
return parent_window_name;
}
/// Set parent window name
void setParentWindowName(String parent_window_name_value)
{
parent_window_name = std::move(parent_window_name_value);
}
/// Has order by
bool hasOrderBy() const
{
return !getOrderBy().getNodes().empty();
}
/// Get order by
const ListNode & getOrderBy() const
{
return children[order_by_child_index]->as<const ListNode &>();
}
/// Get order by
ListNode & getOrderBy()
{
return children[order_by_child_index]->as<ListNode &>();
}
/// Get order by
const QueryTreeNodePtr & getOrderByNode() const
{
return children[order_by_child_index];
}
/// Get order by
QueryTreeNodePtr & getOrderByNode()
{
return children[order_by_child_index];
}
/// Has partition by
bool hasPartitionBy() const
{
return !getPartitionBy().getNodes().empty();
}
/// Get partition by
const ListNode & getPartitionBy() const
{
return children[partition_by_child_index]->as<const ListNode &>();
}
/// Get partition by
ListNode & getPartitionBy()
{
return children[partition_by_child_index]->as<ListNode &>();
}
/// Get partition by
const QueryTreeNodePtr & getPartitionByNode() const
{
return children[partition_by_child_index];
}
/// Get partition by
QueryTreeNodePtr & getPartitionByNode()
{
return children[partition_by_child_index];
}
/// Has frame begin offset
bool hasFrameBeginOffset() const
{
return getFrameBeginOffsetNode() != nullptr;
}
/// Get FRAME begin offset node
const QueryTreeNodePtr & getFrameBeginOffsetNode() const
{
return children[frame_begin_offset_child_index];
}
/// Get FRAME begin offset node
QueryTreeNodePtr & getFrameBeginOffsetNode()
{
return children[frame_begin_offset_child_index];
}
/// Has frame end offset
bool hasFrameEndOffset() const
{
return getFrameEndOffsetNode() != nullptr;
}
/// Get FRAME begin offset node
const QueryTreeNodePtr & getFrameEndOffsetNode() const
{
return children[frame_end_offset_child_index];
}
/// Get FRAME begin offset node
QueryTreeNodePtr & getFrameEndOffsetNode()
{
return children[frame_end_offset_child_index];
}
QueryTreeNodeType getNodeType() const override
{
return QueryTreeNodeType::WINDOW;
}
String getName() const override;
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
bool isEqualImpl(const IQueryTreeNode & rhs) const override;
void updateTreeHashImpl(HashState & hash_state) const override;
protected:
ASTPtr toASTImpl() const override;
QueryTreeNodePtr cloneImpl() const override;
private:
static constexpr size_t order_by_child_index = 0;
static constexpr size_t partition_by_child_index = 1;
static constexpr size_t frame_begin_offset_child_index = 3;
static constexpr size_t frame_end_offset_child_index = 4;
static constexpr size_t children_size = frame_end_offset_child_index + 1;
WindowFrame window_frame;
String parent_window_name;
};
}

View File

@ -20,7 +20,8 @@ std::string WindowFunctionDescription::dump() const
WriteBufferFromOwnString ss;
ss << "window function '" << column_name << "\n";
ss << "function node " << function_node->dumpTree() << "\n";
if (function_node)
ss << "function node " << function_node->dumpTree() << "\n";
ss << "aggregate function '" << aggregate_function->getName() << "'\n";
if (!function_parameters.empty())
{

View File

@ -99,7 +99,6 @@ struct WindowDescription
// The window functions that are calculated for this window.
std::vector<WindowFunctionDescription> window_functions;
std::string dump() const;
void checkValid() const;

View File

@ -33,6 +33,8 @@ void ActionsChainStep::finalizeInputAndOutputColumns(const NameSet & child_input
actions->addOrReplaceInOutputs(*required_output_node);
actions->removeUnusedActions();
/// TODO: Analyzer fix ActionsDAG input and constant nodes with same name
actions->projectInput();
initialize();
}
@ -69,15 +71,30 @@ void ActionsChainStep::initialize()
available_output_columns.clear();
/// TODO: Analyzer fix ActionsDAG input and constant nodes with same name
std::unordered_set<std::string_view> available_output_columns_names;
if (available_output_columns_strategy == AvailableOutputColumnsStrategy::ALL_NODES)
{
for (const auto & node : actions->getNodes())
{
if (available_output_columns_names.contains(node.result_name))
continue;
available_output_columns.emplace_back(node.column, node.result_type, node.result_name);
available_output_columns_names.insert(node.result_name);
}
}
else if (available_output_columns_strategy == AvailableOutputColumnsStrategy::OUTPUT_NODES)
{
for (const auto & node : actions->getOutputs())
{
if (available_output_columns_names.contains(node->result_name))
continue;
available_output_columns.emplace_back(node->column, node->result_type, node->result_name);
available_output_columns_names.insert(node->result_name);
}
}
available_output_columns.insert(available_output_columns.end(), additional_output_columns.begin(), additional_output_columns.end());

View File

@ -65,10 +65,10 @@ public:
explicit ActionsChainStep(ActionsDAGPtr actions_,
AvailableOutputColumnsStrategy available_output_columns_stategy_,
const ColumnsWithTypeAndName & additional_output_columns_)
ColumnsWithTypeAndName additional_output_columns_)
: actions(std::move(actions_))
, available_output_columns_strategy(available_output_columns_stategy_)
, additional_output_columns(additional_output_columns_)
, additional_output_columns(std::move(additional_output_columns_))
{
initialize();
}

View File

@ -29,6 +29,7 @@
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Interpreters/Context.h>
@ -47,6 +48,7 @@
#include <Analyzer/LambdaNode.h>
#include <Analyzer/SortColumnNode.h>
#include <Analyzer/InterpolateColumnNode.h>
#include <Analyzer/WindowNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/QueryNode.h>
@ -56,7 +58,8 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryTreePassManager.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/CollectAggregateFunctionVisitor.h>
#include <Analyzer/CollectAggregateFunctionNodes.h>
#include <Analyzer/CollectWindowFunctionNodes.h>
#include <Planner/Utils.h>
#include <Planner/PlannerContext.h>
@ -64,6 +67,7 @@
#include <Planner/PlannerJoins.h>
#include <Planner/PlannerAggregation.h>
#include <Planner/PlannerSorting.h>
#include <Planner/PlannerWindowFunctions.h>
#include <Planner/ActionsChain.h>
#include <Planner/CollectSets.h>
#include <Planner/CollectTableExpressionData.h>
@ -87,7 +91,6 @@ namespace ErrorCodes
* TODO: Support distributed query processing
* TODO: Support PREWHERE
* TODO: Support ORDER BY
* TODO: Support WINDOW FUNCTIONS
* TODO: Support DISTINCT
* TODO: Support trivial count optimization
* TODO: Support projections
@ -473,18 +476,22 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
auto result_plan = QueryPlan();
result_plan.unitePlans(std::move(join_step), {std::move(plans)});
auto drop_unused_columns_after_join = std::make_shared<ActionsDAG>(result_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
auto drop_unused_columns_after_join_actions_dag = std::make_shared<ActionsDAG>(result_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs updated_outputs;
std::unordered_set<std::string_view> updated_outputs_names;
for (auto & output : drop_unused_columns_after_join->getOutputs())
for (auto & output : drop_unused_columns_after_join_actions_dag->getOutputs())
{
if (planner_context->getGlobalPlannerContext()->hasColumnIdentifier(output->result_name))
updated_outputs.push_back(output);
if (updated_outputs_names.contains(output->result_name) || !planner_context->getGlobalPlannerContext()->hasColumnIdentifier(output->result_name))
continue;
updated_outputs.push_back(output);
updated_outputs_names.insert(output->result_name);
}
drop_unused_columns_after_join->getOutputs() = std::move(updated_outputs);
drop_unused_columns_after_join_actions_dag->getOutputs() = std::move(updated_outputs);
auto drop_unused_columns_after_join_transform_step = std::make_unique<ExpressionStep>(result_plan.getCurrentDataStream(), std::move(drop_unused_columns_after_join));
auto drop_unused_columns_after_join_transform_step = std::make_unique<ExpressionStep>(result_plan.getCurrentDataStream(), std::move(drop_unused_columns_after_join_actions_dag));
drop_unused_columns_after_join_transform_step->setStepDescription("DROP unused columns after JOIN");
result_plan.addStep(std::move(drop_unused_columns_after_join_transform_step));
@ -783,7 +790,7 @@ void Planner::buildQueryPlanIfNeeded()
where_action_step_index = actions_chain.getLastStepIndex();
}
auto aggregate_function_nodes = extractAggregateFunctionNodes(query_tree);
auto aggregate_function_nodes = collectAggregateFunctionNodes(query_tree);
AggregateDescriptions aggregates_descriptions = extractAggregateDescriptions(aggregate_function_nodes, *planner_context);
ColumnsWithTypeAndName aggregates_columns;
aggregates_columns.reserve(aggregates_descriptions.size());
@ -935,6 +942,83 @@ void Planner::buildQueryPlanIfNeeded()
having_action_step_index = actions_chain.getLastStepIndex();
}
auto window_function_nodes = collectWindowFunctionNodes(query_tree);
auto window_descriptions = extractWindowDescriptions(window_function_nodes, *planner_context);
std::optional<size_t> before_window_step_index;
if (!window_function_nodes.empty())
{
chain_available_output_columns = actions_chain.getLastStepAvailableOutputColumnsOrNull();
const auto & window_input = chain_available_output_columns ? *chain_available_output_columns
: query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
ActionsDAGPtr before_window_actions_dag = std::make_shared<ActionsDAG>(window_input);
before_window_actions_dag->getOutputs().clear();
std::unordered_set<std::string_view> before_window_actions_dag_output_nodes_names;
for (auto & window_function_node : window_function_nodes)
{
auto & window_function_node_typed = window_function_node->as<FunctionNode &>();
auto & window_node = window_function_node_typed.getWindowNode()->as<WindowNode &>();
auto expression_dag_nodes = actions_visitor.visit(before_window_actions_dag, window_function_node_typed.getArgumentsNode());
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_dag_output_nodes_names.contains(expression_dag_node->result_name))
continue;
before_window_actions_dag->getOutputs().push_back(expression_dag_node);
before_window_actions_dag_output_nodes_names.insert(expression_dag_node->result_name);
}
expression_dag_nodes = actions_visitor.visit(before_window_actions_dag, window_node.getPartitionByNode());
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_dag_output_nodes_names.contains(expression_dag_node->result_name))
continue;
before_window_actions_dag->getOutputs().push_back(expression_dag_node);
before_window_actions_dag_output_nodes_names.insert(expression_dag_node->result_name);
}
/** We add only sort column sort expression in before WINDOW actions DAG.
* WITH fill expressions must be constant nodes.
*/
auto & order_by_node_list = window_node.getOrderBy();
for (auto & sort_column_node : order_by_node_list.getNodes())
{
auto & sort_column_node_typed = sort_column_node->as<SortColumnNode &>();
expression_dag_nodes = actions_visitor.visit(before_window_actions_dag, sort_column_node_typed.getExpression());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_dag_output_nodes_names.contains(expression_dag_node->result_name))
continue;
before_window_actions_dag->getOutputs().push_back(expression_dag_node);
before_window_actions_dag_output_nodes_names.insert(expression_dag_node->result_name);
}
}
}
ColumnsWithTypeAndName window_functions_additional_columns;
for (auto & window_description : window_descriptions)
for (auto & window_function : window_description.window_functions)
window_functions_additional_columns.emplace_back(nullptr, window_function.aggregate_function->getReturnType(), window_function.column_name);
auto before_window_step = std::make_unique<ActionsChainStep>(std::move(before_window_actions_dag),
ActionsChainStep::AvailableOutputColumnsStrategy::ALL_NODES,
window_functions_additional_columns);
actions_chain.addStep(std::move(before_window_step));
before_window_step_index = actions_chain.getLastStepIndex();
}
chain_available_output_columns = actions_chain.getLastStepAvailableOutputColumnsOrNull();
const auto & projection_input = chain_available_output_columns ? *chain_available_output_columns : query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
auto projection_actions = buildActionsDAGFromExpressionNode(query_node.getProjectionNode(), projection_input, planner_context);
@ -1196,6 +1280,53 @@ void Planner::buildQueryPlanIfNeeded()
query_plan.addStep(std::move(having_step));
}
if (before_window_step_index)
{
auto & before_window_actions_chain_node = actions_chain.at(*before_window_step_index);
auto expression_step_before_window = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(),
before_window_actions_chain_node->getActions());
expression_step_before_window->setStepDescription("Before WINDOW");
query_plan.addStep(std::move(expression_step_before_window));
sortWindowDescriptions(window_descriptions);
size_t window_descriptions_size = window_descriptions.size();
const auto & settings = query_context->getSettingsRef();
for (size_t i = 0; i < window_descriptions_size; ++i)
{
const auto & window_description = window_descriptions[i];
/** We don't need to sort again if the input from previous window already
* has suitable sorting. Also don't create sort steps when there are no
* columns to sort by, because the sort nodes are confused by this. It
* happens in case of `over ()`.
*/
if (!window_description.full_sort_description.empty() &&
(i == 0 || !sortDescriptionIsPrefix(window_description.full_sort_description, window_descriptions[i - 1].full_sort_description)))
{
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
window_description.full_sort_description,
settings.max_block_size,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode),
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
query_context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription("Sorting for window '" + window_description.window_name + "'");
query_plan.addStep(std::move(sorting_step));
}
auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions);
window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
query_plan.addStep(std::move(window_step));
}
}
auto & projection_actions_chain_node = actions_chain.at(projection_step_index);
auto expression_step_projection = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(),
projection_actions_chain_node->getActions());

View File

@ -6,6 +6,8 @@
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/LambdaNode.h>
#include <Analyzer/SortColumnNode.h>
#include <Analyzer/WindowNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ConstantValue.h>
@ -419,7 +421,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
auto function_node_name = calculateActionNodeName(node, *planner_context, node_to_node_name);
if (function_node.isAggregateFunction())
if (function_node.isAggregateFunction() || function_node.isWindowFunction())
{
size_t actions_stack_size = actions_stack.size();
@ -573,6 +575,13 @@ String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerConte
buffer << ')';
if (function_node.isWindowFunction())
{
buffer << " OVER (";
buffer << calculateWindowNodeActionName(function_node.getWindowNode(), planner_context, node_to_name);
buffer << ')';
}
result = buffer.str();
}
break;
@ -627,4 +636,119 @@ String calculateConstantActionNodeName(const Field & constant_literal)
return calculateConstantActionNodeName(constant_literal, applyVisitor(FieldToDataType(), constant_literal));
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name)
{
auto & window_node = node->as<WindowNode &>();
WriteBufferFromOwnString buffer;
if (window_node.hasPartitionBy())
{
buffer << "PARTITION BY ";
auto & partition_by_nodes = window_node.getPartitionBy().getNodes();
size_t partition_by_nodes_size = partition_by_nodes.size();
for (size_t i = 0; i < partition_by_nodes_size; ++i)
{
auto & partition_by_node = partition_by_nodes[i];
buffer << calculateActionNodeName(partition_by_node, planner_context, node_to_name);
if (i + 1 != partition_by_nodes_size)
buffer << ", ";
}
}
if (window_node.hasOrderBy())
{
if (window_node.hasPartitionBy())
buffer << ' ';
buffer << "ORDER BY ";
auto & order_by_nodes = window_node.getOrderBy().getNodes();
size_t order_by_nodes_size = order_by_nodes.size();
for (size_t i = 0; i < order_by_nodes_size; ++i)
{
auto & sort_column_node = order_by_nodes[i]->as<SortColumnNode &>();
buffer << calculateActionNodeName(sort_column_node.getExpression(), planner_context, node_to_name);
auto sort_direction = sort_column_node.getSortDirection();
buffer << (sort_direction == SortDirection::ASCENDING ? " ASC" : " DESC");
auto nulls_sort_direction = sort_column_node.getNullsSortDirection();
if (nulls_sort_direction)
buffer << " NULLS " << (nulls_sort_direction == sort_direction ? "LAST" : "FIRST");
if (auto collator = sort_column_node.getCollator())
buffer << " COLLATE " << collator->getLocale();
if (sort_column_node.withFill())
{
buffer << " WITH FILL";
if (sort_column_node.hasFillFrom())
buffer << " FROM " << calculateActionNodeName(sort_column_node.getFillFrom(), planner_context, node_to_name);
if (sort_column_node.hasFillTo())
buffer << " TO " << calculateActionNodeName(sort_column_node.getFillTo(), planner_context, node_to_name);
if (sort_column_node.hasFillStep())
buffer << " STEP " << calculateActionNodeName(sort_column_node.getFillStep(), planner_context, node_to_name);
}
if (i + 1 != order_by_nodes_size)
buffer << ", ";
}
}
auto & window_frame = window_node.getWindowFrame();
if (!window_frame.is_default)
{
if (window_node.hasPartitionBy() || window_node.hasOrderBy())
buffer << ' ';
buffer << window_frame.type << " BETWEEN ";
if (window_frame.begin_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameBeginOffsetNode(), planner_context, node_to_name);
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
buffer << " AND ";
if (window_frame.end_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameEndOffsetNode(), planner_context, node_to_name);
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
}
return buffer.str();
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context)
{
QueryTreeNodeToName empty_map;
return calculateWindowNodeActionName(node, planner_context, empty_map);
}
}

View File

@ -53,7 +53,7 @@ private:
using QueryTreeNodeToName = std::unordered_map<QueryTreeNodePtr, String>;
String calculateActionNodeName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name);
/** Calculate query tree expression node name action dag name.
/** Calculate query tree expression node action dag name.
*
* For column node column node identifier from planner context is used.
*/
@ -65,4 +65,14 @@ String calculateConstantActionNodeName(const Field & constant_literal, const Dat
/// Calculate action node name for constant, data type will be derived from constant literal value
String calculateConstantActionNodeName(const Field & constant_literal);
/** Calculate action node name for window node.
* Window node action name can only be part of window function action name.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, QueryTreeNodeToName & node_to_name);
/** Calculate action node name for window node.
* Window node action name can only be part of window function action name.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context);
}

View File

@ -6,7 +6,7 @@
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/CollectAggregateFunctionVisitor.h>
#include <Analyzer/CollectAggregateFunctionNodes.h>
#include <Planner/PlannerActionsVisitor.h>
@ -193,22 +193,6 @@ void resolveGroupingFunctions(QueryTreeNodePtr & query_node,
planner_context);
}
QueryTreeNodes extractAggregateFunctionNodes(const QueryTreeNodePtr & query_node)
{
const auto & query_node_typed = query_node->as<QueryNode &>();
QueryTreeNodes aggregate_function_nodes;
if (query_node_typed.hasHaving())
collectAggregateFunctionNodes(query_node_typed.getHaving(), aggregate_function_nodes);
if (query_node_typed.hasOrderBy())
collectAggregateFunctionNodes(query_node_typed.getOrderByNode(), aggregate_function_nodes);
collectAggregateFunctionNodes(query_node_typed.getProjectionNode(), aggregate_function_nodes);
return aggregate_function_nodes;
}
AggregateDescriptions extractAggregateDescriptions(const QueryTreeNodes & aggregate_function_nodes, const PlannerContext & planner_context)
{
QueryTreeNodeToName node_to_name;
@ -217,16 +201,16 @@ AggregateDescriptions extractAggregateDescriptions(const QueryTreeNodes & aggreg
for (const auto & aggregate_function_node : aggregate_function_nodes)
{
const auto & aggregagte_function_node_typed = aggregate_function_node->as<FunctionNode &>();
const auto & aggregate_function_node_typed = aggregate_function_node->as<FunctionNode &>();
String node_name = calculateActionNodeName(aggregate_function_node, planner_context, node_to_name);
auto [_, inserted] = unique_aggregate_action_node_names.emplace(node_name);
if (!inserted)
continue;
AggregateDescription aggregate_description;
aggregate_description.function = aggregagte_function_node_typed.getAggregateFunction();
aggregate_description.function = aggregate_function_node_typed.getAggregateFunction();
const auto & parameters_nodes = aggregagte_function_node_typed.getParameters().getNodes();
const auto & parameters_nodes = aggregate_function_node_typed.getParameters().getNodes();
aggregate_description.parameters.reserve(parameters_nodes.size());
for (const auto & parameter_node : parameters_nodes)
@ -235,7 +219,7 @@ AggregateDescriptions extractAggregateDescriptions(const QueryTreeNodes & aggreg
aggregate_description.parameters.push_back(parameter_node->getConstantValue().getValue());
}
const auto & arguments_nodes = aggregagte_function_node_typed.getArguments().getNodes();
const auto & arguments_nodes = aggregate_function_node_typed.getArguments().getNodes();
aggregate_description.argument_names.reserve(arguments_nodes.size());
for (const auto & argument_node : arguments_nodes)

View File

@ -20,9 +20,6 @@ void resolveGroupingFunctions(QueryTreeNodePtr & query_node,
const GroupingSetsParamsList & grouping_sets_parameters_list,
const PlannerContext & planner_context);
/// Extract aggregate function nodes from query node
QueryTreeNodes extractAggregateFunctionNodes(const QueryTreeNodePtr & query_node);
/// Extract aggregate descriptions from aggregate function nodes
AggregateDescriptions extractAggregateDescriptions(const QueryTreeNodes & aggregate_function_nodes, const PlannerContext & planner_context);

View File

@ -7,7 +7,7 @@
namespace DB
{
/// Extract sort description from query order by node
/// Extract sort description from order by node
SortDescription extractSortDescription(const QueryTreeNodePtr & order_by_node, const PlannerContext & planner_context);
}

View File

@ -0,0 +1,145 @@
#include <Planner/PlannerWindowFunctions.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/WindowNode.h>
#include <Interpreters/Context.h>
#include <Planner/PlannerSorting.h>
#include <Planner/PlannerActionsVisitor.h>
namespace DB
{
namespace
{
WindowDescription extractWindowDescriptionFromWindowNode(const QueryTreeNodePtr & node, const PlannerContext & planner_context)
{
auto & window_node = node->as<WindowNode &>();
WindowDescription window_description;
window_description.window_name = calculateWindowNodeActionName(node, planner_context);
for (const auto & partition_by_node : window_node.getPartitionBy().getNodes())
{
auto partition_by_node_action_name = calculateActionNodeName(partition_by_node, planner_context);
auto partition_by_sort_column_description = SortColumnDescription(partition_by_node_action_name, 1 /* direction */, 1 /* nulls_direction */);
window_description.partition_by.push_back(std::move(partition_by_sort_column_description));
}
window_description.order_by = extractSortDescription(window_node.getOrderByNode(), planner_context);
window_description.full_sort_description = window_description.partition_by;
window_description.full_sort_description.insert(window_description.full_sort_description.end(), window_description.order_by.begin(), window_description.order_by.end());
/// WINDOW frame is validated during query analysis stage
window_description.frame = window_node.getWindowFrame();
const auto & query_context = planner_context.getQueryContext();
const auto & query_context_settings = query_context->getSettingsRef();
bool compile_sort_description = query_context_settings.compile_sort_description;
size_t min_count_to_compile_sort_description = query_context_settings.min_count_to_compile_sort_description;
window_description.partition_by.compile_sort_description = compile_sort_description;
window_description.partition_by.min_count_to_compile_sort_description = min_count_to_compile_sort_description;
window_description.order_by.compile_sort_description = compile_sort_description;
window_description.order_by.min_count_to_compile_sort_description = min_count_to_compile_sort_description;
window_description.full_sort_description.compile_sort_description = compile_sort_description;
window_description.full_sort_description.min_count_to_compile_sort_description = min_count_to_compile_sort_description;
return window_description;
}
}
std::vector<WindowDescription> extractWindowDescriptions(const QueryTreeNodes & window_function_nodes, const PlannerContext & planner_context)
{
std::unordered_map<std::string, WindowDescription> window_name_to_description;
for (const auto & window_function_node : window_function_nodes)
{
auto & window_function_node_typed = window_function_node->as<FunctionNode &>();
auto function_window_description = extractWindowDescriptionFromWindowNode(window_function_node_typed.getWindowNode(), planner_context);
auto window_name = function_window_description.window_name;
auto [it, _] = window_name_to_description.emplace(window_name, std::move(function_window_description));
auto & window_description = it->second;
WindowFunctionDescription window_function;
window_function.function_node = nullptr;
window_function.column_name = calculateActionNodeName(window_function_node, planner_context);
window_function.aggregate_function = window_function_node_typed.getAggregateFunction();
const auto & parameters_nodes = window_function_node_typed.getParameters().getNodes();
window_function.function_parameters.reserve(parameters_nodes.size());
for (const auto & parameter_node : parameters_nodes)
{
/// Function parameters constness validated during analysis stage
window_function.function_parameters.push_back(parameter_node->getConstantValue().getValue());
}
const auto & arguments_nodes = window_function_node_typed.getArguments().getNodes();
window_function.argument_names.reserve(arguments_nodes.size());
window_function.argument_types.reserve(arguments_nodes.size());
for (const auto & argument_node : arguments_nodes)
{
String argument_node_name = calculateActionNodeName(argument_node, planner_context);
window_function.argument_names.emplace_back(std::move(argument_node_name));
window_function.argument_types.emplace_back(argument_node->getResultType());
}
window_description.window_functions.push_back(window_function);
}
std::vector<WindowDescription> result;
result.reserve(window_name_to_description.size());
for (auto && [_, window_description] : window_name_to_description)
result.push_back(std::move(window_description));
return result;
}
void sortWindowDescriptions(std::vector<WindowDescription> & window_descriptions)
{
auto window_description_comparator = [](const WindowDescription & lhs, const WindowDescription & rhs)
{
const auto & left = lhs.full_sort_description;
const auto & right = rhs.full_sort_description;
for (size_t i = 0; i < std::min(left.size(), right.size()); ++i)
{
if (left[i].column_name < right[i].column_name)
return true;
else if (left[i].column_name > right[i].column_name)
return false;
else if (left[i].direction < right[i].direction)
return true;
else if (left[i].direction > right[i].direction)
return false;
else if (left[i].nulls_direction < right[i].nulls_direction)
return true;
else if (left[i].nulls_direction > right[i].nulls_direction)
return false;
assert(left[i] == right[i]);
}
/** Note that we check the length last, because we want to put together the
* sort orders that have common prefix but different length.
*/
return left.size() > right.size();
};
::sort(window_descriptions.begin(), window_descriptions.end(), window_description_comparator);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Core/SortDescription.h>
#include <Planner/PlannerContext.h>
#include <Interpreters/WindowDescription.h>
namespace DB
{
/// Extract and sort window description from query.
std::vector<WindowDescription> extractWindowDescriptions(const QueryTreeNodes & window_function_nodes, const PlannerContext & planner_context);
/** Try to sort window description in such an order that the window with the longest
* sort description goes first, and all window that use its prefixes follow.
*/
void sortWindowDescriptions(std::vector<WindowDescription> & window_descriptions);
}

View File

@ -180,4 +180,19 @@ ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & express
return action_dag;
}
bool sortDescriptionIsPrefix(const SortDescription & prefix, const SortDescription & full)
{
if (prefix.size() > full.size())
return false;
for (size_t i = 0; i < prefix.size(); ++i)
{
if (full[i] != prefix[i])
return false;
}
return true;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Parsers/IAST.h>
@ -44,4 +45,7 @@ ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & express
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context);
/// Returns true if prefix sort description is prefix of full sort descriptor, false otherwise
bool sortDescriptionIsPrefix(const SortDescription & prefix, const SortDescription & full);
}

View File

@ -62,7 +62,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
}
auto context = read_from_merge_tree->getContext();
if (!context->getSettings().optimize_read_in_window_order)
if (!context->getSettings().optimize_read_in_window_order || context->getSettingsRef().use_analyzer)
{
return 0;
}