returning constant column as window function

This commit is contained in:
Alexander Kuzmenkov 2020-12-10 22:06:52 +03:00
parent eb0c817bf2
commit ac8b896f36
7 changed files with 295 additions and 85 deletions

View File

@ -37,5 +37,12 @@ void dumpSortDescription(const SortDescription & description, const Block & head
}
}
std::string dumpSortDescription(const SortDescription & description)
{
WriteBufferFromOwnString wb;
dumpSortDescription(description, Block{}, wb);
return wb.str();
}
}

View File

@ -72,4 +72,6 @@ class Block;
/// Outputs user-readable description into `out`.
void dumpSortDescription(const SortDescription & description, const Block & header, WriteBuffer & out);
std::string dumpSortDescription(const SortDescription & description);
}

View File

@ -31,7 +31,7 @@ struct WindowFunctionDescription
{
std::string window_name;
std::string column_name;
const IAST * wrapper_node;
const ASTFunction * wrapper_node;
const ASTFunction * function_node;
AggregateFunctionPtr aggregate_function;
Array function_parameters;
@ -45,9 +45,12 @@ struct WindowDescription
{
std::string window_name;
// Always ASC for now.
std::vector<std::string> partition_by;
std::vector<std::string> order_by;
SortDescription partition_by;
SortDescription order_by;
SortDescription full_sort_description;
// No frame info as of yet.
std::string dump() const;
};
using WindowFunctionDescriptions = std::vector<WindowFunctionDescription>;

View File

@ -463,6 +463,57 @@ bool ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions)
return !aggregates().empty();
}
// Parses order by & partition by from window() wrapper function.
// Remove this when we have proper grammar.
static SortDescription windowArgumentToSortDescription(IAST* ast, const WindowDescription & w)
{
SortDescription result;
if (const auto * as_tuple = ast->as<ASTFunction>();
as_tuple
&& as_tuple->name == "tuple"
&& as_tuple->arguments)
{
// untuple it
for (const auto & element_ast
: as_tuple->arguments->children)
{
const auto * with_alias = dynamic_cast<
const ASTWithAlias *>(element_ast.get());
if (!with_alias)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"(1) Expected column in PARTITION BY"
" for window '{}', got '{}'",
w.window_name,
element_ast->formatForErrorMessage());
}
result.push_back(
SortColumnDescription(
with_alias->getColumnName(),
1 /* direction */,
1 /* nulls_direction */));
}
}
else if (const auto * with_alias
= dynamic_cast<const ASTWithAlias *>(ast))
{
result.push_back(
SortColumnDescription(
with_alias->getColumnName(),
1 /* direction */,
1 /* nulls_direction */));
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"(2) Expected tuple or column in PARTITION BY"
" for window '{}', got '{}'",
w.window_name,
ast->formatForErrorMessage());
}
return result;
}
bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
{
@ -470,10 +521,6 @@ bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
{
fmt::print(stderr, "window function ast: {}\n", wrapper_node->dumpTree());
// Not sure why NoMakeSet, copied from aggregate functions.
getRootActionsNoMakeSet(wrapper_node->arguments, true /* no subqueries */,
actions);
// FIXME not thread-safe, should use a per-query counter.
static int window_index = 1;
@ -490,45 +537,33 @@ bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
const auto partition_by_ast = elist->children[1];
fmt::print(stderr, "partition by ast {}\n",
partition_by_ast->dumpTree());
if (const auto * as_tuple = partition_by_ast->as<ASTFunction>();
as_tuple
&& as_tuple->name == "tuple"
&& as_tuple->arguments)
{
// untuple it
for (const auto & element_ast
: as_tuple->arguments->children)
{
const auto * with_alias = dynamic_cast<
const ASTWithAlias *>(element_ast.get());
if (!with_alias)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"(1) Expected column in PARTITION BY"
" for window '{}', got '{}'",
window_description.window_name,
element_ast->formatForErrorMessage());
}
window_description.partition_by.push_back(
with_alias->getColumnName());
}
}
else if (const auto * with_alias
= dynamic_cast<const ASTWithAlias *>(partition_by_ast.get()))
{
window_description.partition_by.push_back(
with_alias->getColumnName());
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"(2) Expected tuple or column in PARTITION BY"
" for window '{}', got '{}'",
window_description.window_name,
partition_by_ast->formatForErrorMessage());
}
window_description.partition_by = windowArgumentToSortDescription(
partition_by_ast.get(), window_description);
}
if (elist->children.size() == 3)
{
const auto order_by_ast = elist->children[2];
fmt::print(stderr, "order by ast {}\n",
order_by_ast->dumpTree());
window_description.order_by = windowArgumentToSortDescription(
order_by_ast.get(), window_description);
}
if (elist->children.size() > 3)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Too many arguments to window function '{}'",
wrapper_node->formatForErrorMessage());
}
}
window_description.full_sort_description = window_description.partition_by;
window_description.full_sort_description.insert(
window_description.full_sort_description.end(),
window_description.order_by.begin(),
window_description.order_by.end());
WindowFunctionDescription window_function;
window_function.window_name = window_description.window_name;
@ -543,6 +578,18 @@ bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
window_function.function_node->parameters)
: Array();
// We have to fill actions for window function arguments, so that we are
// then able to find their argumen types. The `actions` passed to this
// functions are temporary and are discarded.
// The same calculation is done in appendWindowFunctionsArguments.
getRootActionsNoMakeSet(wrapper_node->arguments, true /* no subqueries */,
actions);
// We have to separately get actions for the arguments of the aggregate
// function we calculate over window, because the ActionsVisitor does
// not descend into aggregate functions.
getRootActionsNoMakeSet(window_function.function_node->arguments,
true /* no subqueries */, actions);
const ASTs & arguments
= window_function.function_node->arguments->children;
window_function.argument_types.resize(arguments.size());
@ -956,29 +1003,77 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
}
void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
ExpressionActionsChain & chain, bool only_types)
ExpressionActionsChain & chain, bool /* only_types */)
{
fmt::print(stderr, "actions before window: {}\n", chain.dumpChain());
const auto * select_query = getSelectQuery();
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
/*
for (const auto & desc : aggregate_descriptions)
for (const auto & name : desc.argument_names)
step.required_output.emplace_back(name);
*/
for (const auto & f : window_functions)
{
// Not sure why NoMakeSet, copied from aggregate functions.
getRootActionsNoMakeSet(f.wrapper_node->arguments,
true /* no subqueries */, step.actions());
/// Collect aggregates removing duplicates by node.getColumnName()
/// It's not clear why we recollect aggregates (for query parts) while we're able to use previously collected ones (for entire query)
/// @note The original recollection logic didn't remove duplicates.
GetAggregatesVisitor::Data data;
GetAggregatesVisitor(data).visit(select_query->select());
// We have to separately get actions for the arguments of the aggregate
// function we calculate over window, because the ActionsVisitor does
// not descend into aggregate functions.
// Not sure why NoMakeSet, copied from aggregate functions.
getRootActionsNoMakeSet(f.function_node->arguments,
true /* no subqueries */, step.actions());
/// TODO: data.aggregates -> aggregates()
for (const ASTFunction * node : data.window_functions)
for (auto & argument : node->arguments->children)
getRootActions(argument, only_types, step.actions());
// Add column with window function name and value "1".
// It is an aggregate function, so it won't be added by getRootActions.
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeInt64>();
col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */);
col.name = f.column_name;
step.actions()->addColumn(col);
}
// /*
// for (const auto & desc : aggregate_descriptions)
// for (const auto & name : desc.argument_names)
// step.required_output.emplace_back(name);
// */
// const auto * select_query = getSelectQuery();
// /// Collect aggregates removing duplicates by node.getColumnName()
// /// It's not clear why we recollect aggregates (for query parts) while we're able to use previously collected ones (for entire query)
// /// @note The original recollection logic didn't remove duplicates.
// GetAggregatesVisitor::Data data;
// GetAggregatesVisitor(data).visit(select_query->select());
// // 1) just add everything there is in the AST,
// for (const ASTFunction * node : data.window_functions)
// {
// for (auto & argument : node->arguments->children)
// {
// getRootActions(argument, only_types, step.actions());
// }
// }
// 2) mark the columns that are really required:
for (const auto & f : window_functions)
{
for (const auto & a : f.function_node->arguments->children)
{
// 2.1) function arguments,
step.required_output.push_back(a->getColumnName());
}
// 2.2) function result,
step.required_output.push_back(f.column_name);
}
// 2.3) PARTITION BY and ORDER BY columns.
for (const auto & [_, w] : window_descriptions)
{
for (const auto & c : w.full_sort_description)
{
step.required_output.push_back(c.column_name);
}
}
fmt::print(stderr, "actions after window: {}\n", chain.dumpChain());
}
@ -1155,6 +1250,18 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio
}
}
for (const auto & f : window_functions)
{
if (required_result_columns.empty()
|| required_result_columns.count(f.column_name))
{
result_columns.emplace_back(f.column_name, f.column_name);
step.required_output.push_back(f.column_name);
}
}
fmt::print(stderr, "chain before last projection: {}\n",
chain.dumpChain());
auto actions = chain.getLastActions();
actions->project(result_columns);
return actions;
@ -1407,6 +1514,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
&& !query.final()
&& join_allow_read_in_order;
/*
if (has_window)
{
query_analyzer.appendWindowFunctionsArguments(chain, only_types || !first_stage);
@ -1414,11 +1522,15 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
finalize_chain(chain);
}
*/
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
fmt::print(stderr, "chain after select: {}\n", chain.dumpChain());
query_analyzer.appendWindowFunctionsArguments(chain, only_types || !first_stage);
fmt::print(stderr, "chain after window: {}\n", chain.dumpChain());
selected_columns = chain.getLastStep().required_output;
has_order_by = query.orderBy() != nullptr;
before_order_and_select = query_analyzer.appendOrderBy(
@ -1609,4 +1721,15 @@ std::string WindowFunctionDescription::dump() const
return ss.str();
}
std::string WindowDescription::dump() const
{
std::stringstream ss;
ss << "window '" << window_name << "'\n";
ss << "partition_by " << dumpSortDescription(partition_by) << "\n";
ss << "order_by " << dumpSortDescription(order_by) << "\n";
ss << "full_sort_description " << dumpSortDescription(full_sort_description) << "\n";
return ss.str();
}
}

View File

@ -76,7 +76,7 @@ struct ExpressionAnalyzerData
*
* NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer.
*/
class ExpressionAnalyzer : protected ExpressionAnalyzerData, private boost::noncopyable
class ExpressionAnalyzer : public ExpressionAnalyzerData, private boost::noncopyable
{
private:
/// Extracts settings to enlight which are used (and avoid copy of others).

View File

@ -33,36 +33,37 @@
#include <Interpreters/QueryAliasesVisitor.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/AddingDelayedSourceStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/OffsetStep.h>
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/OffsetStep.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
@ -949,13 +950,15 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.need_aggregate)
{
executeAggregation(query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
executeAggregation(query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
else
{
/// FIXME calculate windows here
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeWindow(query_plan);
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
@ -1001,7 +1004,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
else if (expressions.hasHaving())
executeHaving(query_plan, expressions.before_having);
/// FIXME calculate windows here
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeWindow(query_plan);
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
@ -1735,6 +1740,74 @@ void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const Act
}
void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
{
for (const auto & f : query_analyzer->window_functions)
{
const auto & w = query_analyzer->window_descriptions[f.window_name];
fmt::print(stderr, "window function '{}' over window '{}'\n",
f.column_name, f.window_name);
fmt::print(stderr, "{}\n{}\n", f.dump(), w.dump());
const Settings & settings = context->getSettingsRef();
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort,
settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(partial_sorting));
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription("Merge sorted blocks for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
// First MergeSorted, now MergingSorted......
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */);
merging_sorted->setStepDescription("Merge sorted streams for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(merging_sorted));
// Add column with window function name and value "1".
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeInt64>();
col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */);
col.name = f.column_name;
ActionsDAGPtr window_dag = std::make_shared<ActionsDAG>();
window_dag->addColumn(col);
fmt::print(stderr, "window dag: {}\n", window_dag->dumpDAG());
auto window_step = std::make_unique<WindowStep>(
query_plan.getCurrentDataStream(),
window_dag);
window_step->setStepDescription("Window step for function '"
+ f.column_name + "'");
query_plan.addStep(std::move(window_step));
}
}
void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
{
const Settings & settings = context->getSettingsRef();

View File

@ -120,6 +120,8 @@ private:
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ActionsDAGPtr & expression);
static void executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description);
/// FIXME should go through ActionsDAG to behave as a proper function
void executeWindow(QueryPlan & query_plan);
void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPlan & query_plan);