Multithreading after window functions (#50771)

* feat: Preserve number of streams after evaluation the window functions to allow parallel stream processing

* fix style

* fix style

* fix style

* setting query_plan_preserve_num_streams_after_window_functions default true

* fix tests by SETTINGS query_plan_preserve_num_streams_after_window_functions=0

* fix test references

* Resize the streams after the last window function, to keep the order between WindowTransforms (and WindowTransform works on single stream anyway).

* feat: Preserve number of streams after evaluation the window functions to allow parallel stream processing

* fix style

* fix style

* fix style

* setting query_plan_preserve_num_streams_after_window_functions default true

* fix tests by SETTINGS query_plan_preserve_num_streams_after_window_functions=0

* fix test references

* Resize the streams after the last window function, to keep the order between WindowTransforms (and WindowTransform works on single stream anyway).

* add perf test

* perf: change the dataset from 50M to 5M

* rename query_plan_preserve_num_streams_after_window_functions -> query_plan_enable_multithreading_after_window_functions

* update test reference

* fix clang-tidy

---------

Co-authored-by: Nikita Taranov <nikita.taranov@clickhouse.com>
This commit is contained in:
frinkr 2023-10-27 18:36:28 +08:00 committed by GitHub
parent f5890a5b4c
commit 18c50c11b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 119 additions and 27 deletions

View File

@ -681,6 +681,7 @@ class IColumn;
M(Bool, query_plan_aggregation_in_order, true, "Use query plan for aggregation-in-order optimisation", 0) \
M(Bool, query_plan_remove_redundant_sorting, true, "Remove redundant sorting in query plan. For example, sorting steps related to ORDER BY clauses in subqueries", 0) \
M(Bool, query_plan_remove_redundant_distinct, true, "Remove redundant Distinct step in query plan", 0) \
M(Bool, query_plan_enable_multithreading_after_window_functions, true, "Enable multithreading after evaluating window functions to allow parallel stream processing", 0) \
M(UInt64, regexp_max_matches_per_row, 1000, "Max matches of any single regexp per row, used to safeguard 'extractAllGroupsHorizontal' against consuming too much memory with greedy RE.", 0) \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \

View File

@ -2936,7 +2936,11 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
query_plan.addStep(std::move(sorting_step));
}
auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions);
// Fan out streams only for the last window to preserve the ordering between windows,
// and WindowTransform works on single stream anyway.
const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == windows_sorted.size());
auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions, streams_fan_out);
window_step->setStepDescription("Window step for window '" + window.window_name + "'");
query_plan.addStep(std::move(window_step));

View File

@ -905,8 +905,12 @@ void addWindowSteps(QueryPlan & query_plan,
query_plan.addStep(std::move(sorting_step));
}
// Fan out streams only for the last window to preserve the ordering between windows,
// and WindowTransform works on single stream anyway.
const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == window_descriptions_size);
auto window_step
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions);
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions, streams_fan_out);
window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
query_plan.addStep(std::move(window_step));
}

View File

@ -10,14 +10,14 @@
namespace DB
{
static ITransformingStep::Traits getTraits()
static ITransformingStep::Traits getTraits(bool preserves_sorting)
{
return ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
.preserves_sorting = preserves_sorting,
},
{
.preserves_number_of_rows = true
@ -46,10 +46,12 @@ static Block addWindowFunctionResultColumns(const Block & block,
WindowStep::WindowStep(
const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits())
const std::vector<WindowFunctionDescription> & window_functions_,
bool streams_fan_out_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits(!streams_fan_out_))
, window_description(window_description_)
, window_functions(window_functions_)
, streams_fan_out(streams_fan_out_)
{
// We don't remove any columns, only add, so probably we don't have to update
// the output DataStream::distinct_columns.
@ -60,6 +62,8 @@ WindowStep::WindowStep(
void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto num_threads = pipeline.getNumThreads();
// This resize is needed for cases such as `over ()` when we don't have a
// sort node, and the input might have multiple streams. The sort node would
// have resized it.
@ -72,6 +76,11 @@ void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
input_streams.front().header, output_stream->header, window_description, window_functions);
});
if (streams_fan_out)
{
pipeline.resize(num_threads);
}
assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,
"WindowStep transform for '" + window_description.window_name + "'");
}

View File

@ -16,7 +16,8 @@ class WindowStep : public ITransformingStep
public:
explicit WindowStep(const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_);
const std::vector<WindowFunctionDescription> & window_functions_,
bool streams_fan_out_);
String getName() const override { return "Window"; }
@ -32,6 +33,7 @@ private:
WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;
bool streams_fan_out;
};
}

View File

@ -0,0 +1,69 @@
<test>
<create_query>
CREATE TABLE
window_test(id Int64, value Int64, partition Int64, msg String)
Engine=MergeTree
ORDER BY id
</create_query>
<fill_query>
INSERT INTO window_test
SELECT number, rand(1) % 500, number % 3000, randomPrintableASCII(2) FROM numbers(5000000)
</fill_query>
<query>
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame1,
MAX(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame2,
sipHash64(frame1),
sipHash64(frame2)
FROM window_test
</query>
<query>
SELECT id AS key,
sipHash64(sum(frame)) AS value
FROM (
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test)
GROUP BY key
ORDER BY key, value
</query>
<query>
SELECT id % 100000 AS key,
sipHash64(sum(frame)) AS value
FROM (
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test)
GROUP BY key
ORDER BY key, value
</query>
<query>
WITH 'xxxxyyyyxxxxyyyyxxxxyyyyxxxxyyyy' AS cipherKey
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame,
toString(frame) AS str,
encrypt('aes-256-ofb', str, cipherKey) AS enc,
decrypt('aes-256-ofb', str, cipherKey) AS dec
FROM window_test
</query>
<query>
SELECT id,
AVG(value) OVER (PARTITION by partition ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test
ORDER BY id
</query>
<query>
SELECT DISTINCT AVG(value) OVER (PARTITION by partition ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test
ORDER BY frame
</query>
<drop_query>DROP TABLE IF EXISTS window_test</drop_query>
</test>

View File

@ -1,18 +1,18 @@
-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;
1
2
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;
1
2
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;
1
1
drop table if exists t_01568;
create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
3 2
3 2
3 2
@ -22,7 +22,7 @@ select sum(number) over w, max(number) over w from t_01568 window w as (partitio
21 8
21 8
21 8
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
6 2
6 2
@ -41,23 +41,23 @@ select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t
42 8
42 8
42 8
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
24 5
42 8
-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
[[0,3,6,0,3,6]]
[[0,3,6,0,3,6],[1,4,7,1,4,7]]
[[0,3,6,0,3,6],[1,4,7,1,4,7],[2,5,8,2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }
-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)
from remote('127.0.0.{1,2}', '', t_01568)

View File

@ -1,11 +1,11 @@
-- Tags: distributed
-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;
drop table if exists t_01568;
@ -13,16 +13,16 @@ create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }
-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)

View File

@ -19,7 +19,8 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;
-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_enable_multithreading_after_window_functions=0;
1
2
3

View File

@ -20,7 +20,9 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;
-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_enable_multithreading_after_window_functions=0;
drop table order_by_const;