Merge pull request #22704 from ClickHouse/aku/window-multiple

fix window functions with multiple input streams and no sorting
This commit is contained in:
Alexander Kuzmenkov 2021-04-07 15:15:47 +03:00 committed by GitHub
commit 06bb58cb69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 87 additions and 48 deletions

View File

@ -37,34 +37,33 @@ namespace ErrorCodes
Field QueryFuzzer::getRandomField(int type)
{
static constexpr Int64 bad_int64_values[]
= {-2, -1, 0, 1, 2, 3, 7, 10, 100, 255, 256, 257, 1023, 1024,
1025, 65535, 65536, 65537, 1024 * 1024 - 1, 1024 * 1024,
1024 * 1024 + 1, INT_MIN - 1ll, INT_MIN, INT_MIN + 1,
INT_MAX - 1, INT_MAX, INT_MAX + 1ll, INT64_MIN, INT64_MIN + 1,
INT64_MAX - 1, INT64_MAX};
switch (type)
{
case 0:
{
static constexpr Int64 values[]
= {-2, -1, 0, 1, 2, 3, 7, 10, 100, 255, 256, 257, 1023, 1024,
1025, 65535, 65536, 65537, 1024 * 1024 - 1, 1024 * 1024,
1024 * 1024 + 1, INT64_MIN, INT64_MAX};
return values[fuzz_rand() % (sizeof(values) / sizeof(*values))];
return bad_int64_values[fuzz_rand() % (sizeof(bad_int64_values)
/ sizeof(*bad_int64_values))];
}
case 1:
{
static constexpr float values[]
= {NAN, INFINITY, -INFINITY, 0., 0.0001, 0.5, 0.9999,
1., 1.0001, 2., 10.0001, 100.0001, 1000.0001};
return values[fuzz_rand() % (sizeof(values) / sizeof(*values))];
= {NAN, INFINITY, -INFINITY, 0., -0., 0.0001, 0.5, 0.9999,
1., 1.0001, 2., 10.0001, 100.0001, 1000.0001, 1e10, 1e20,
FLT_MIN, FLT_MIN + FLT_EPSILON, FLT_MAX, FLT_MAX + FLT_EPSILON}; return values[fuzz_rand() % (sizeof(values) / sizeof(*values))];
}
case 2:
{
static constexpr Int64 values[]
= {-2, -1, 0, 1, 2, 3, 7, 10, 100, 255, 256, 257, 1023, 1024,
1025, 65535, 65536, 65537, 1024 * 1024 - 1, 1024 * 1024,
1024 * 1024 + 1, INT64_MIN, INT64_MAX};
static constexpr UInt64 scales[] = {0, 1, 2, 10};
return DecimalField<Decimal64>(
values[fuzz_rand() % (sizeof(values) / sizeof(*values))],
scales[fuzz_rand() % (sizeof(scales) / sizeof(*scales))]
);
bad_int64_values[fuzz_rand() % (sizeof(bad_int64_values)
/ sizeof(*bad_int64_values))],
scales[fuzz_rand() % (sizeof(scales) / sizeof(*scales))]);
}
default:
assert(false);

View File

@ -1624,6 +1624,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& !query.limitBy()
&& query.limitLength()
&& !query_analyzer->hasAggregation()
&& !query_analyzer->hasWindow()
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset
&& limit_length + limit_offset < max_block_size)
{

View File

@ -86,6 +86,38 @@ void WindowFrame::toString(WriteBuffer & buf) const
void WindowFrame::checkValid() const
{
// Check the validity of offsets.
if (type == WindowFrame::FrameType::Rows
|| type == WindowFrame::FrameType::Groups)
{
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given.",
toString(type),
applyVisitor(FieldVisitorToString(), begin_offset),
Field::Types::toString(begin_offset.getType()));
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given.",
toString(type),
applyVisitor(FieldVisitorToString(), end_offset),
Field::Types::toString(end_offset.getType()));
}
}
// Check relative positioning of offsets.
// UNBOUNDED PRECEDING end and UNBOUNDED FOLLOWING start should have been
// forbidden at the parsing level.
assert(!(begin_type == BoundaryType::Unbounded && !begin_preceding));

View File

@ -580,18 +580,6 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
else if (parser_literal.parse(pos, ast_literal, expected))
{
const Field & value = ast_literal->as<ASTLiteral &>().value;
if ((node->frame.type == WindowFrame::FrameType::Rows
|| node->frame.type == WindowFrame::FrameType::Groups)
&& !(value.getType() == Field::Types::UInt64
|| (value.getType() == Field::Types::Int64
&& value.get<Int64>() >= 0)))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset for '{}' frame must be a nonnegative integer, '{}' of type '{}' given.",
WindowFrame::toString(node->frame.type),
applyVisitor(FieldVisitorToString(), value),
Field::Types::toString(value.getType()));
}
node->frame.begin_offset = value;
node->frame.begin_type = WindowFrame::BoundaryType::Offset;
}
@ -641,18 +629,6 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
else if (parser_literal.parse(pos, ast_literal, expected))
{
const Field & value = ast_literal->as<ASTLiteral &>().value;
if ((node->frame.type == WindowFrame::FrameType::Rows
|| node->frame.type == WindowFrame::FrameType::Groups)
&& !(value.getType() == Field::Types::UInt64
|| (value.getType() == Field::Types::Int64
&& value.get<Int64>() >= 0)))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset for '{}' frame must be a nonnegative integer, '{}' of type '{}' given.",
WindowFrame::toString(node->frame.type),
applyVisitor(FieldVisitorToString(), value),
Field::Types::toString(value.getType()));
}
node->frame.end_offset = value;
node->frame.end_type = WindowFrame::BoundaryType::Offset;
}

View File

@ -64,6 +64,11 @@ WindowStep::WindowStep(const DataStream & input_stream_,
void WindowStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
// This resize is needed for cases such as `over ()` when we don't have a
// sort node, and the input might have multiple streams. The sort node would
// have resized it.
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & /*header*/)
{
return std::make_shared<WindowTransform>(input_header,

View File

@ -257,10 +257,9 @@ WindowTransform::WindowTransform(const Block & input_header_,
const IColumn * column = entry.column.get();
APPLY_FOR_TYPES(compareValuesWithOffset)
// Check that the offset type matches the window type.
// Convert the offsets to the ORDER BY column type. We can't just check
// that it matches, because e.g. the int literals are always (U)Int64,
// but the column might be Int8 and so on.
// that the type matches, because e.g. the int literals are always
// (U)Int64, but the column might be Int8 and so on.
if (window_description.frame.begin_type
== WindowFrame::BoundaryType::Offset)
{
@ -435,6 +434,9 @@ auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int offset) con
assertValid(x);
assert(offset <= 0);
// abs(offset) is less than INT_MAX, as checked in the parser, so
// this negation should always work.
assert(offset >= -INT_MAX);
if (x.row >= static_cast<uint64_t>(-offset))
{
x.row -= -offset;
@ -1500,6 +1502,12 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
"The offset for function {} must be nonnegative, {} given",
getName(), offset);
}
if (offset > INT_MAX)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The offset for function {} must be less than {}, {} given",
getName(), INT_MAX, offset);
}
}
const auto [target_row, offset_left] = transform->moveRowNumber(

View File

@ -0,0 +1,5 @@
-- { echo }
set allow_experimental_window_functions = 1;
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
1
2

View File

@ -0,0 +1,4 @@
-- { echo }
set allow_experimental_window_functions = 1;
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));

View File

@ -942,8 +942,9 @@ FROM numbers(2)
;
1 0
1 1
-- optimize_read_in_order conflicts with sorting for window functions, must
-- be disabled.
-- optimize_read_in_order conflicts with sorting for window functions, check that
-- it is disabled.
drop table if exists window_mt;
create table window_mt engine MergeTree order by number
as select number, mod(number, 3) p from numbers(100);
select number, count(*) over (partition by p)
@ -1096,7 +1097,7 @@ select count() over (order by toInt64(number) range between -1 preceding and unb
select count() over (order by toInt64(number) range between -1 following and unbounded following) from numbers(1); -- { serverError 36 }
select count() over (order by toInt64(number) range between unbounded preceding and -1 preceding) from numbers(1); -- { serverError 36 }
select count() over (order by toInt64(number) range between unbounded preceding and -1 following) from numbers(1); -- { serverError 36 }
---- a test with aggregate function that allocates memory in arena
-- a test with aggregate function that allocates memory in arena
select sum(a[length(a)])
from (
select groupArray(number) over (partition by modulo(number, 11)
@ -1104,3 +1105,6 @@ from (
from numbers_mt(10000)
) settings max_block_size = 7;
49995000
-- -INT_MIN row offset that can lead to problems with negation, found when fuzzing
-- under UBSan. Should be limited to at most INT_MAX.
select count() over (rows between 2147483648 preceding and 2147493648 following) from numbers(2); -- { serverError 36 }

View File

@ -329,8 +329,9 @@ SELECT
FROM numbers(2)
;
-- optimize_read_in_order conflicts with sorting for window functions, must
-- be disabled.
-- optimize_read_in_order conflicts with sorting for window functions, check that
-- it is disabled.
drop table if exists window_mt;
create table window_mt engine MergeTree order by number
as select number, mod(number, 3) p from numbers(100);
@ -402,10 +403,14 @@ select count() over (order by toInt64(number) range between -1 following and unb
select count() over (order by toInt64(number) range between unbounded preceding and -1 preceding) from numbers(1); -- { serverError 36 }
select count() over (order by toInt64(number) range between unbounded preceding and -1 following) from numbers(1); -- { serverError 36 }
---- a test with aggregate function that allocates memory in arena
-- a test with aggregate function that allocates memory in arena
select sum(a[length(a)])
from (
select groupArray(number) over (partition by modulo(number, 11)
order by modulo(number, 1111), number) a
from numbers_mt(10000)
) settings max_block_size = 7;
-- -INT_MIN row offset that can lead to problems with negation, found when fuzzing
-- under UBSan. Should be limited to at most INT_MAX.
select count() over (rows between 2147483648 preceding and 2147493648 following) from numbers(2); -- { serverError 36 }