one plan step per window

This commit is contained in:
Alexander Kuzmenkov 2020-12-22 04:37:45 +03:00
parent 77b816caea
commit 52ace7a3d1
11 changed files with 131 additions and 26 deletions

View File

@ -62,6 +62,11 @@ struct WindowDescription
// No frame info as of yet.
// Reverse map to function descriptions, for convenience of building the
// plan. Just copy them because it's more convenient.
std::vector<WindowFunctionDescription> window_functions;
std::string dump() const;
};

View File

@ -475,22 +475,20 @@ bool ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions)
bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
{
// Convenient to check here because at least we have the Context.
if (!windowFunctions().empty() &&
if (!syntax->window_function_asts.empty() &&
!context.getSettingsRef().allow_experimental_window_functions)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Window functions are not implemented (while processing '{}')",
windowFunctions()[0]->formatForErrorMessage());
syntax->window_function_asts[0]->formatForErrorMessage());
}
int next_window_index = 1;
for (const ASTFunction * function_node : windowFunctions())
for (const ASTFunction * function_node : syntax->window_function_asts)
{
assert(function_node->is_window_function);
WindowDescription window_description;
window_description.window_name = fmt::format("window_{}",
next_window_index++);
window_description.window_name = function_node->getWindowDescription();
if (function_node->window_partition_by)
{
@ -581,12 +579,28 @@ bool ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr & actions)
window_function.argument_types,
window_function.function_parameters, properties);
window_descriptions.insert({window_description.window_name,
window_description});
window_functions.push_back(window_function);
window_function_descriptions.push_back(window_function);
if (auto it = window_descriptions.find(window_description.window_name);
it != window_descriptions.end())
{
assert(it->second.full_sort_description
== window_description.full_sort_description);
}
else
{
window_descriptions.insert({window_description.window_name,
window_description});
}
}
return !windowFunctions().empty();
// Populate the reverse map.
for (const auto & f : window_function_descriptions)
{
window_descriptions[f.window_name].window_functions.push_back(f);
}
return !window_function_descriptions.empty();
}
@ -965,7 +979,7 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
{
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
for (const auto & f : window_functions)
for (const auto & f : window_function_descriptions)
{
// Requiring a constant reference to a shared pointer to non-const AST
// doesn't really look sane, but the visitor does indeed require it.
@ -982,7 +996,7 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
}
// 2) mark the columns that are really required:
for (const auto & f : window_functions)
for (const auto & f : window_function_descriptions)
{
for (const auto & a : f.function_node->arguments->children)
{

View File

@ -62,7 +62,7 @@ struct ExpressionAnalyzerData
bool has_window = false;
WindowDescriptions window_descriptions;
WindowFunctionDescriptions window_functions;
WindowFunctionDescriptions window_function_descriptions;
NamesAndTypesList window_columns;
bool has_global_subqueries = false;
@ -141,7 +141,6 @@ protected:
const TableJoin & analyzedJoin() const { return *syntax->analyzed_join; }
const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
const std::vector<const ASTFunction *> & aggregates() const { return syntax->aggregates; }
const std::vector<const ASTFunction *> & windowFunctions() const { return syntax->window_functions; }
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables(bool do_global);

View File

@ -1754,10 +1754,8 @@ void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const Act
void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
{
for (const auto & f : query_analyzer->window_functions)
for (const auto & [_, w] : query_analyzer->window_descriptions)
{
const auto & w = query_analyzer->window_descriptions[f.window_name];
const Settings & settings = context->getSettingsRef();
auto partial_sorting = std::make_unique<PartialSortingStep>(
@ -1784,7 +1782,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
+ w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
// First MergeSorted, now MergingSorted......
// First MergeSorted, now MergingSorted.
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
@ -1797,9 +1795,9 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
auto window_step = std::make_unique<WindowStep>(
query_plan.getCurrentDataStream(),
w,
std::vector<WindowFunctionDescription>(1, f));
window_step->setStepDescription("Window step for function '"
+ f.column_name + "'");
w.window_functions);
window_step->setStepDescription("Window step for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(window_step));
}

View File

@ -751,7 +751,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
result.aggregates = getAggregates(query, *select_query);
result.window_functions = getWindowFunctions(query, *select_query);
result.window_function_asts = getWindowFunctions(query, *select_query);
result.collectUsedColumns(query, true);
result.ast_join = select_query->join();

View File

@ -35,7 +35,7 @@ struct TreeRewriterResult
Aliases aliases;
std::vector<const ASTFunction *> aggregates;
std::vector<const ASTFunction *> window_functions;
std::vector<const ASTFunction *> window_function_asts;
/// Which column is needed to be ARRAY-JOIN'ed to get the specified.
/// For example, for `SELECT s.v ... ARRAY JOIN a AS s` will get "s.v" -> "a.v".

View File

@ -475,21 +475,44 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
}
settings.ostr << " OVER (";
appendWindowDescription(settings, state, nested_dont_need_parens);
settings.ostr << ")";
}
std::string ASTFunction::getWindowDescription() const
{
WriteBufferFromOwnString ostr;
FormatSettings settings{ostr, true /* one_line */};
FormatState state;
FormatStateStacked frame;
appendWindowDescription(settings, state, frame);
return ostr.str();
}
void ASTFunction::appendWindowDescription(const FormatSettings & settings,
FormatState & state, FormatStateStacked frame) const
{
if (!is_window_function)
{
return;
}
if (window_partition_by)
{
settings.ostr << "PARTITION BY ";
window_partition_by->formatImpl(settings, state, nested_dont_need_parens);
window_partition_by->formatImpl(settings, state, frame);
}
if (window_partition_by && window_order_by)
{
settings.ostr << " ";
}
if (window_order_by)
{
settings.ostr << "ORDER BY ";
window_order_by->formatImpl(settings, state, nested_dont_need_parens);
window_order_by->formatImpl(settings, state, frame);
}
settings.ostr << ")";
}
}

View File

@ -39,6 +39,11 @@ public:
ASTPtr toLiteral() const; // Try to convert functions like Array or Tuple to a literal form.
void appendWindowDescription(const FormatSettings & settings,
FormatState & state, FormatStateStacked frame) const;
std::string getWindowDescription() const;
protected:
void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void appendColumnNameImpl(WriteBuffer & ostr) const override;

View File

@ -158,6 +158,7 @@ public:
bool always_quote_identifiers = false;
IdentifierQuotingStyle identifier_quoting_style = IdentifierQuotingStyle::Backticks;
// Newline or whitespace.
char nl_or_ws;
FormatSettings(WriteBuffer & ostr_, bool one_line_)

View File

@ -87,6 +87,10 @@ select number, max(number) over (partition by intDiv(number, 3) order by number
-- window count() is overwritten with aggregate count()
-- select number, count(), count() over (partition by intDiv(number, 3)) from numbers(10) group by number order by count() desc;
-- different windows
-- an explain test would also be helpful, but it's too immature now and I don't
-- want to change reference all the time
9 9
6 8
7 8
@ -97,3 +101,49 @@ select number, max(number) over (partition by intDiv(number, 3) order by number
0 2
1 2
2 2
select number, max(number) over (partition by intDiv(number, 3) order by number desc), count(number) over (partition by intDiv(number, 5) order by number) as m from numbers(31) order by number settings max_block_size = 2;
-- two functions over the same window
-- an explain test would also be helpful, but it's too immature now and I don't
-- want to change reference all the time
0 2 1
1 2 2
2 2 3
3 5 4
4 5 5
5 5 1
6 8 2
7 8 3
8 8 4
9 11 5
10 11 1
11 11 2
12 14 3
13 14 4
14 14 5
15 17 1
16 17 2
17 17 3
18 20 4
19 20 5
20 20 1
21 23 2
22 23 3
23 23 4
24 26 5
25 26 1
26 26 2
27 29 3
28 29 4
29 29 5
30 30 1
select number, max(number) over (partition by intDiv(number, 3) order by number desc), count(number) over (partition by intDiv(number, 3) order by number desc) as m from numbers(7) order by number settings max_block_size = 2;
0 2 3
1 2 2
2 2 1
3 5 3
4 5 2
5 5 1
6 6 1

View File

@ -30,3 +30,13 @@ select number, max(number) over (partition by intDiv(number, 3) order by number
-- this one doesn't work yet -- looks like the column names clash, and the
-- window count() is overwritten with aggregate count()
-- select number, count(), count() over (partition by intDiv(number, 3)) from numbers(10) group by number order by count() desc;
-- different windows
-- an explain test would also be helpful, but it's too immature now and I don't
-- want to change reference all the time
select number, max(number) over (partition by intDiv(number, 3) order by number desc), count(number) over (partition by intDiv(number, 5) order by number) as m from numbers(31) order by number settings max_block_size = 2;
-- two functions over the same window
-- an explain test would also be helpful, but it's too immature now and I don't
-- want to change reference all the time
select number, max(number) over (partition by intDiv(number, 3) order by number desc), count(number) over (partition by intDiv(number, 3) order by number desc) as m from numbers(7) order by number settings max_block_size = 2;