unbounded following frame end

This commit is contained in:
Alexander Kuzmenkov 2021-01-30 04:16:44 +03:00
parent 2dd5062cc0
commit 5519e4c134
13 changed files with 376 additions and 129 deletions

View File

@ -1,7 +1,6 @@
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <Common/FieldVisitors.h> #include <Common/FieldVisitors.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Parsers/ASTFunction.h>
namespace DB namespace DB
{ {
@ -100,31 +99,4 @@ void AggregateDescription::explain(WriteBuffer & out, size_t indent) const
} }
} }
std::string WindowFunctionDescription::dump() const
{
WriteBufferFromOwnString ss;
ss << "window function '" << column_name << "\n";
ss << "function node " << function_node->dumpTree() << "\n";
ss << "aggregate function '" << aggregate_function->getName() << "'\n";
if (!function_parameters.empty())
{
ss << "parameters " << toString(function_parameters) << "\n";
}
return ss.str();
}
std::string WindowDescription::dump() const
{
WriteBufferFromOwnString ss;
ss << "window '" << window_name << "'\n";
ss << "partition_by " << dumpSortDescription(partition_by) << "\n";
ss << "order_by " << dumpSortDescription(order_by) << "\n";
ss << "full_sort_description " << dumpSortDescription(full_sort_description) << "\n";
return ss.str();
}
} }

View File

@ -1,18 +1,14 @@
#pragma once #pragma once
#include <AggregateFunctions/IAggregateFunction.h> #include <AggregateFunctions/IAggregateFunction.h>
#include <DataTypes/IDataType.h>
#include <Core/ColumnNumbers.h> #include <Core/ColumnNumbers.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <Core/SortDescription.h> #include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
namespace DB namespace DB
{ {
class ASTFunction;
struct AggregateDescription struct AggregateDescription
{ {
AggregateFunctionPtr function; AggregateFunctionPtr function;
@ -26,72 +22,4 @@ struct AggregateDescription
using AggregateDescriptions = std::vector<AggregateDescription>; using AggregateDescriptions = std::vector<AggregateDescription>;
struct WindowFunctionDescription
{
std::string column_name;
const ASTFunction * function_node;
AggregateFunctionPtr aggregate_function;
Array function_parameters;
DataTypes argument_types;
Names argument_names;
std::string dump() const;
};
struct WindowFrame
{
enum class FrameType { Rows, Groups, Range };
enum class OffsetType { Unbounded, Current, Offset };
// This flag signifies that the frame properties were not set explicitly by
// user, but the fields of this structure still have to contain proper values
// for the default frame of RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
bool is_default = true;
FrameType type = FrameType::Range;
/*
* We don't need these yet.
* OffsetType begin_offset = Unbounded;
* OffsetType end_offset = Current;
*/
bool operator == (const WindowFrame & other) const
{
// We don't compare is_default because it's not a real property of the
// frame, and only influences how we display it.
return other.type == type;
}
};
struct WindowDescription
{
std::string window_name;
// We don't care about the particular order of keys for PARTITION BY, only
// that they are sorted. For now we always require ASC, but we could be more
// flexible and match any direction, or even different order of columns.
SortDescription partition_by;
SortDescription order_by;
// To calculate the window function, we sort input data first by PARTITION BY,
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
WindowFrame frame;
// The window functions that are calculated for this window.
std::vector<WindowFunctionDescription> window_functions;
std::string dump() const;
};
using WindowFunctionDescriptions = std::vector<WindowFunctionDescription>;
using WindowDescriptions = std::unordered_map<std::string, WindowDescription>;
} }

View File

@ -3,6 +3,7 @@
#include <DataStreams/IBlockStream_fwd.h> #include <DataStreams/IBlockStream_fwd.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/TreeRewriter.h> #include <Interpreters/TreeRewriter.h>
#include <Interpreters/SubqueryForSet.h> #include <Interpreters/SubqueryForSet.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>

View File

@ -0,0 +1,36 @@
#include <Interpreters/WindowDescription.h>
#include <IO/Operators.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
std::string WindowFunctionDescription::dump() const
{
WriteBufferFromOwnString ss;
ss << "window function '" << column_name << "\n";
ss << "function node " << function_node->dumpTree() << "\n";
ss << "aggregate function '" << aggregate_function->getName() << "'\n";
if (!function_parameters.empty())
{
ss << "parameters " << toString(function_parameters) << "\n";
}
return ss.str();
}
std::string WindowDescription::dump() const
{
WriteBufferFromOwnString ss;
ss << "window '" << window_name << "'\n";
ss << "partition_by " << dumpSortDescription(partition_by) << "\n";
ss << "order_by " << dumpSortDescription(order_by) << "\n";
ss << "full_sort_description " << dumpSortDescription(full_sort_description) << "\n";
return ss.str();
}
}

View File

@ -0,0 +1,140 @@
#pragma once
// ASTLiteral.h
#include <Core/Field.h>
#include <Parsers/ASTWithAlias.h>
#include <Parsers/TokenIterator.h>
#include <Common/FieldVisitors.h>
#include <optional>
// ASTLiteral.cpp
#include <Common/SipHash.h>
#include <Common/FieldVisitors.h>
#include <Parsers/ASTLiteral.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
// The really needed includes follow
#include <Common/FieldVisitors.h>
#include <Core/Field.h>
#include <Parsers/IAST_fwd.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Core/SortDescription.h>
#include <DataTypes/IDataType.h>
#include <Core/Names.h>
#include <Core/Types.h>
namespace DB
{
class ASTFunction;
struct WindowFunctionDescription
{
std::string column_name;
const ASTFunction * function_node;
AggregateFunctionPtr aggregate_function;
Array function_parameters;
DataTypes argument_types;
Names argument_names;
std::string dump() const;
};
struct WindowFrame
{
enum class FrameType { Rows, Groups, Range };
enum class BoundaryType { Unbounded, Current, Offset };
// This flag signifies that the frame properties were not set explicitly by
// user, but the fields of this structure still have to contain proper values
// for the default frame of RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
bool is_default = true;
FrameType type = FrameType::Range;
// UNBOUNDED FOLLOWING for the frame end doesn't make much sense, so
// Unbounded here means UNBOUNDED PRECEDING.
// Offset might be both preceding and following, preceding is negative
// (be careful, this is not symmetric w/the frame end unlike in the grammar,
// so a positive literal in PRECEDING will give a negative number here).
BoundaryType begin_type = BoundaryType::Unbounded;
// This should have been a Field but I'm getting some crazy linker errors.
int64_t begin_offset = 0;
// Here as well, Unbounded is UNBOUNDED FOLLOWING, and positive Offset is
// following.
BoundaryType end_type = BoundaryType::Current;
int64_t end_offset = 0;
bool operator == (const WindowFrame & other) const
{
// We don't compare is_default because it's not a real property of the
// frame, and only influences how we display it.
return other.type == type
&& other.begin_type == begin_type
&& other.begin_offset == begin_offset
&& other.end_type == end_type
&& other.end_offset == end_offset
;
}
static std::string toString(FrameType type)
{
switch (type)
{
case FrameType::Rows:
return "ROWS";
case FrameType::Groups:
return "GROUPS";
case FrameType::Range:
return "RANGE";
}
}
static std::string toString(BoundaryType type)
{
switch (type)
{
case BoundaryType::Unbounded:
return "UNBOUNDED";
case BoundaryType::Offset:
return "OFFSET";
case BoundaryType::Current:
return "CURRENT ROW";
}
}
};
struct WindowDescription
{
std::string window_name;
// We don't care about the particular order of keys for PARTITION BY, only
// that they are sorted. For now we always require ASC, but we could be more
// flexible and match any direction, or even different order of columns.
SortDescription partition_by;
SortDescription order_by;
// To calculate the window function, we sort input data first by PARTITION BY,
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
WindowFrame frame;
// The window functions that are calculated for this window.
std::vector<WindowFunctionDescription> window_functions;
std::string dump() const;
};
using WindowFunctionDescriptions = std::vector<WindowFunctionDescription>;
using WindowDescriptions = std::unordered_map<std::string, WindowDescription>;
}

View File

@ -59,11 +59,36 @@ void ASTWindowDefinition::formatImpl(const FormatSettings & settings,
if (!frame.is_default) if (!frame.is_default)
{ {
const auto * name = frame.type == WindowFrame::FrameType::Rows settings.ostr << WindowFrame::toString(frame.type) << " BETWEEN ";
? "ROWS" : frame.type == WindowFrame::FrameType::Groups if (frame.begin_type == WindowFrame::BoundaryType::Current)
? "GROUPS" : "RANGE"; {
settings.ostr << "CURRENT ROW";
settings.ostr << name << " UNBOUNDED PRECEDING"; }
else if (frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
settings.ostr << "UNBOUNDED PRECEDING";
}
else
{
settings.ostr << abs(frame.begin_offset);
settings.ostr << " "
<< (frame.begin_offset > 0 ? "FOLLOWING" : "PRECEDING");
}
settings.ostr << " AND ";
if (frame.end_type == WindowFrame::BoundaryType::Current)
{
settings.ostr << "CURRENT ROW";
}
else if (frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
settings.ostr << "UNBOUNDED PRECEDING";
}
else
{
settings.ostr << abs(frame.end_offset);
settings.ostr << " "
<< (frame.end_offset > 0 ? "FOLLOWING" : "PRECEDING");
}
} }
} }

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <Interpreters/AggregateDescription.h> #include <Interpreters/WindowDescription.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>

View File

@ -532,6 +532,7 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
ParserKeyword keyword_between("BETWEEN"); ParserKeyword keyword_between("BETWEEN");
ParserKeyword keyword_unbounded("UNBOUNDED"); ParserKeyword keyword_unbounded("UNBOUNDED");
ParserKeyword keyword_preceding("PRECEDING"); ParserKeyword keyword_preceding("PRECEDING");
ParserKeyword keyword_following("FOLLOWING");
ParserKeyword keyword_and("AND"); ParserKeyword keyword_and("AND");
ParserKeyword keyword_current_row("CURRENT ROW"); ParserKeyword keyword_current_row("CURRENT ROW");
@ -539,40 +540,92 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
// 1) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW // 1) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
// 2) ROWS UNBOUNDED PRECEDING // 2) ROWS UNBOUNDED PRECEDING
// When the frame end is not specified (2), it defaults to CURRENT ROW. // When the frame end is not specified (2), it defaults to CURRENT ROW.
if (keyword_between.ignore(pos, expected)) const bool has_frame_end = keyword_between.ignore(pos, expected);
if (keyword_current_row.ignore(pos, expected))
{ {
// 1) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW node->frame.begin_type = WindowFrame::BoundaryType::Current;
if (!keyword_unbounded.ignore(pos, expected)) }
else
{
ParserLiteral parser_literal;
ASTPtr ast_literal;
if (keyword_unbounded.ignore(pos, expected))
{
node->frame.begin_type = WindowFrame::BoundaryType::Unbounded;
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.begin_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
}
else
{ {
return false; return false;
} }
if (!keyword_preceding.ignore(pos, expected)) if (keyword_preceding.ignore(pos, expected))
{
node->frame.begin_offset = - node->frame.begin_offset;
}
else if (keyword_following.ignore(pos, expected))
{
if (node->frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start UNBOUNDED FOLLOWING is not implemented");
}
}
else
{ {
return false; return false;
} }
}
if (has_frame_end)
{
if (!keyword_and.ignore(pos, expected)) if (!keyword_and.ignore(pos, expected))
{ {
return false; return false;
} }
if (!keyword_current_row.ignore(pos, expected)) if (keyword_current_row.ignore(pos, expected))
{ {
return false; node->frame.end_type = WindowFrame::BoundaryType::Current;
} }
} else
else
{
// 2) ROWS UNBOUNDED PRECEDING
if (!keyword_unbounded.ignore(pos, expected))
{ {
return false; ParserLiteral parser_literal;
} ASTPtr ast_literal;
if (keyword_unbounded.ignore(pos, expected))
{
node->frame.end_type = WindowFrame::BoundaryType::Unbounded;
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.end_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
}
else
{
return false;
}
if (!keyword_preceding.ignore(pos, expected)) if (keyword_preceding.ignore(pos, expected))
{ {
return false; if (node->frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame end UNBOUNDED PRECEDING is not implemented");
}
node->frame.end_offset = -node->frame.end_offset;
}
else if (keyword_following.ignore(pos, expected))
{
}
else
{
return false;
}
} }
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Processors/QueryPlan/ITransformingStep.h> #include <Processors/QueryPlan/ITransformingStep.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/WindowDescription.h>
namespace DB namespace DB
{ {

View File

@ -7,6 +7,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
const extern int NOT_IMPLEMENTED;
}
WindowTransform::WindowTransform(const Block & input_header_, WindowTransform::WindowTransform(const Block & input_header_,
const Block & output_header_, const Block & output_header_,
const WindowDescription & window_description_, const WindowDescription & window_description_,
@ -164,6 +169,13 @@ void WindowTransform::advanceFrameStart()
{ {
// Frame start is always UNBOUNDED PRECEDING for now, so we don't have to // Frame start is always UNBOUNDED PRECEDING for now, so we don't have to
// move it. It is initialized when the new partition starts. // move it. It is initialized when the new partition starts.
if (window_description.frame.begin_type
!= WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start type '{}' is not implemented",
WindowFrame::toString(window_description.frame.begin_type));
}
} }
bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
@ -180,7 +192,7 @@ bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
return false; return false;
} }
// For RANGE frame, rows that compare equal w/ORDER BY are peers. // For RANGE and GROUPS frames, rows that compare equal w/ORDER BY are peers.
assert(window_description.frame.type == WindowFrame::FrameType::Range); assert(window_description.frame.type == WindowFrame::FrameType::Range);
const size_t n = order_by_indices.size(); const size_t n = order_by_indices.size();
if (n == 0) if (n == 0)
@ -272,6 +284,13 @@ void WindowTransform::advanceFrameEndCurrentRow()
frame_ended = partition_ended; frame_ended = partition_ended;
} }
void WindowTransform::advanceFrameEndUnbounded()
{
// The UNBOUNDED FOLLOWING frame ends when the partition ends.
frame_end = partition_end;
frame_ended = partition_ended;
}
void WindowTransform::advanceFrameEnd() void WindowTransform::advanceFrameEnd()
{ {
// No reason for this function to be called again after it succeeded. // No reason for this function to be called again after it succeeded.
@ -279,8 +298,20 @@ void WindowTransform::advanceFrameEnd()
const auto frame_end_before = frame_end; const auto frame_end_before = frame_end;
// The only frame end we have for now is CURRENT ROW. switch (window_description.frame.end_type)
advanceFrameEndCurrentRow(); {
case WindowFrame::BoundaryType::Current:
// The only frame end we have for now is CURRENT ROW.
advanceFrameEndCurrentRow();
break;
case WindowFrame::BoundaryType::Unbounded:
advanceFrameEndUnbounded();
break;
case WindowFrame::BoundaryType::Offset:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The frame end type '{}' is not implemented",
WindowFrame::toString(window_description.frame.end_type));
}
// fmt::print(stderr, "frame_end {} -> {}\n", frame_end_before, frame_end); // fmt::print(stderr, "frame_end {} -> {}\n", frame_end_before, frame_end);

View File

@ -1,7 +1,8 @@
#pragma once #pragma once
#include <Processors/ISimpleTransform.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/WindowDescription.h>
#include <Processors/IProcessor.h>
#include <Common/AlignedBuffer.h> #include <Common/AlignedBuffer.h>
@ -103,6 +104,7 @@ private:
void advanceFrameStart(); void advanceFrameStart();
void advanceFrameEnd(); void advanceFrameEnd();
void advanceFrameEndCurrentRow(); void advanceFrameEndCurrentRow();
void advanceFrameEndUnbounded();
bool arePeers(const RowNumber & x, const RowNumber & y) const; bool arePeers(const RowNumber & x, const RowNumber & y) const;
void writeOutCurrentRow(); void writeOutCurrentRow();

View File

@ -472,3 +472,47 @@ select min(number) over (partition by p) from (select number, intDiv(number, 3)
6 6
6 6
9 9
-- UNBOUNDED FOLLOWING frame end
select
min(number) over wa, min(number) over wo,
max(number) over wa, max(number) over wo
from
(select number, intDiv(number, 3) p, mod(number, 5) o
from numbers(31))
window
wa as (partition by p order by o
range between unbounded preceding and unbounded following),
wo as (partition by p order by o
rows between unbounded preceding and unbounded following)
settings max_block_size = 2;
0 0 2 2
0 0 2 2
0 0 2 2
3 3 5 5
3 3 5 5
3 3 5 5
6 6 8 8
6 6 8 8
6 6 8 8
9 9 11 11
9 9 11 11
9 9 11 11
12 12 14 14
12 12 14 14
12 12 14 14
15 15 17 17
15 15 17 17
15 15 17 17
18 18 20 20
18 18 20 20
18 18 20 20
21 21 23 23
21 21 23 23
21 21 23 23
24 24 26 26
24 24 26 26
24 24 26 26
27 27 29 29
27 27 29 29
27 27 29 29
30 30 30 30

View File

@ -148,3 +148,18 @@ settings max_block_size = 5
-- A case where the partition end is in the current block, and the frame end -- A case where the partition end is in the current block, and the frame end
-- is triggered by the partition end. -- is triggered by the partition end.
select min(number) over (partition by p) from (select number, intDiv(number, 3) p from numbers(10)); select min(number) over (partition by p) from (select number, intDiv(number, 3) p from numbers(10));
-- UNBOUNDED FOLLOWING frame end
select
min(number) over wa, min(number) over wo,
max(number) over wa, max(number) over wo
from
(select number, intDiv(number, 3) p, mod(number, 5) o
from numbers(31))
window
wa as (partition by p order by o
range between unbounded preceding and unbounded following),
wo as (partition by p order by o
rows between unbounded preceding and unbounded following)
settings max_block_size = 2;