Merge pull request #38731 from azat/views-max_insert_threads

Fix number of threads for pushing to views
This commit is contained in:
Alexey Milovidov 2022-07-04 07:43:26 +03:00 committed by GitHub
commit c711012399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 268 additions and 55 deletions

View File

@ -457,20 +457,26 @@ BlockIO InterpreterInsertQuery::execute()
});
size_t num_select_threads = pipeline.getNumThreads();
size_t num_insert_threads = std::max_element(out_chains.begin(), out_chains.end(), [&](const auto &a, const auto &b)
{
return a.getNumThreads() < b.getNumThreads();
})->getNumThreads();
for (auto & chain : out_chains)
resources = chain.detachResources();
pipeline.addChains(std::move(out_chains));
pipeline.setMaxThreads(num_insert_threads);
/// Don't use more threads for insert then for select to reduce memory consumption.
if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads)
pipeline.setMaxThreads(num_select_threads);
if (!settings.parallel_view_processing)
{
/// Don't use more threads for INSERT than for SELECT to reduce memory consumption.
if (pipeline.getNumThreads() > num_select_threads)
pipeline.setMaxThreads(num_select_threads);
}
else if (pipeline.getNumThreads() < settings.max_threads)
{
/// It is possible for query to have max_threads=1, due to optimize_trivial_insert_select,
/// however in case of parallel_view_processing and multiple views, views can still be processed in parallel.
///
/// Note, number of threads will be limited by buildPushingToViewsChain() to max_threads.
pipeline.setMaxThreads(settings.max_threads);
}
pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
{

View File

@ -0,0 +1,11 @@
<!-- https://github.com/ClickHouse/ClickHouse/issues/37900 -->
<test>
<create_query>create table views_max_insert_threads_null (a UInt64) Engine = Null</create_query>
<create_query>create materialized view views_max_insert_threads_mv Engine = Null AS select now() as ts, max(a) from views_max_insert_threads_null group by ts</create_query>
<query>insert into views_max_insert_threads_null select * from numbers_mt(3000000000) settings max_threads = 16, max_insert_threads=16</query>
<drop_query>drop table if exists views_max_insert_threads_null</drop_query>
<drop_query>drop table if exists views_max_insert_threads_mv</drop_query>
</test>

View File

@ -1,9 +1,23 @@
-- { echoOn }
set parallel_view_processing=1;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select 'optimize_trivial_insert_select=0', 'max_insert_threads=0';
optimize_trivial_insert_select=0 max_insert_threads=0
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=0,
optimize_trivial_insert_select=0,
max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
1
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '0' and
Settings['optimize_trivial_insert_select'] = '0' and
Settings['max_insert_threads'] = '0';
2
select count() from testX;
10
select count() from testXA;
@ -12,11 +26,22 @@ select count() from testXB;
0
select count() from testXC;
10
set parallel_view_processing=0;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select 'optimize_trivial_insert_select=0', 'max_insert_threads=16';
optimize_trivial_insert_select=0 max_insert_threads=16
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=0,
optimize_trivial_insert_select=0,
max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
1
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '0' and
Settings['optimize_trivial_insert_select'] = '0' and
Settings['max_insert_threads'] = '16';
2
select count() from testX;
20
select count() from testXA;
@ -25,3 +50,147 @@ select count() from testXB;
0
select count() from testXC;
20
select 'optimize_trivial_insert_select=1', 'max_insert_threads=0';
optimize_trivial_insert_select=1 max_insert_threads=0
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=0,
optimize_trivial_insert_select=1,
max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '0' and
Settings['optimize_trivial_insert_select'] = '1' and
Settings['max_insert_threads'] = '0';
2
select count() from testX;
30
select count() from testXA;
30
select count() from testXB;
0
select count() from testXC;
30
select 'optimize_trivial_insert_select=1', 'max_insert_threads=16';
optimize_trivial_insert_select=1 max_insert_threads=16
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=0,
optimize_trivial_insert_select=1,
max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '0' and
Settings['optimize_trivial_insert_select'] = '1' and
Settings['max_insert_threads'] = '16';
2
select count() from testX;
40
select count() from testXA;
40
select count() from testXB;
0
select count() from testXC;
40
select 'optimize_trivial_insert_select=0', 'max_insert_threads=0';
optimize_trivial_insert_select=0 max_insert_threads=0
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=1,
optimize_trivial_insert_select=0,
max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '1' and
Settings['optimize_trivial_insert_select'] = '0' and
Settings['max_insert_threads'] = '0';
5
select count() from testX;
50
select count() from testXA;
50
select count() from testXB;
0
select count() from testXC;
50
select 'optimize_trivial_insert_select=0', 'max_insert_threads=16';
optimize_trivial_insert_select=0 max_insert_threads=16
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=1,
optimize_trivial_insert_select=0,
max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '1' and
Settings['optimize_trivial_insert_select'] = '0' and
Settings['max_insert_threads'] = '16';
5
select count() from testX;
60
select count() from testXA;
60
select count() from testXB;
0
select count() from testXC;
60
select 'optimize_trivial_insert_select=1', 'max_insert_threads=0';
optimize_trivial_insert_select=1 max_insert_threads=0
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=1,
optimize_trivial_insert_select=1,
max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '1' and
Settings['optimize_trivial_insert_select'] = '1' and
Settings['max_insert_threads'] = '0';
5
select count() from testX;
70
select count() from testXA;
70
select count() from testXB;
0
select count() from testXC;
70
select 'optimize_trivial_insert_select=1', 'max_insert_threads=16';
optimize_trivial_insert_select=1 max_insert_threads=16
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing=1,
optimize_trivial_insert_select=1,
max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '1' and
Settings['optimize_trivial_insert_select'] = '1' and
Settings['max_insert_threads'] = '16';
5
select count() from testX;
80
select count() from testXA;
80
select count() from testXB;
0
select count() from testXC;
80

View File

@ -1,39 +0,0 @@
set max_threads = 0;
drop table if exists testX;
drop table if exists testXA;
drop table if exists testXB;
drop table if exists testXC;
create table testX (A Int64) engine=MergeTree order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX;
-- { echoOn }
set parallel_view_processing=1;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
select count() from testX;
select count() from testXA;
select count() from testXB;
select count() from testXC;
set parallel_view_processing=0;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
select count() from testX;
select count() from testXA;
select count() from testXB;
select count() from testXC;
-- { echoOff }
drop table testX;
drop view testXA;
drop view testXB;
drop view testXC;

View File

@ -0,0 +1,50 @@
-- Tags: no-s3-storage
-- no-s3-storage: s3 has 20 more threads
-- avoid settings randomization by clickhouse-test
set max_threads = 0;
drop table if exists testX;
drop table if exists testXA;
drop table if exists testXB;
drop table if exists testXC;
create table testX (A Int64) engine=MergeTree order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleep(0.2), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleep(0.1) from testX;
-- { echoOn }
{% for parallel_view_processing in [0, 1] %}
{% for optimize_trivial_insert_select in [0, 1] %}
{% for max_insert_threads in [0, 16] %}
select 'optimize_trivial_insert_select={{ optimize_trivial_insert_select }}', 'max_insert_threads={{ max_insert_threads }}';
insert into testX select number from numbers(10) settings
log_queries=1,
parallel_view_processing={{ parallel_view_processing }},
optimize_trivial_insert_select={{ optimize_trivial_insert_select }},
max_insert_threads={{ max_insert_threads }}; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select arrayUniq(thread_ids) from system.query_log where
current_database = currentDatabase() and
type != 'QueryStart' and
query like '%insert into testX %' and
Settings['parallel_view_processing'] = '{{ parallel_view_processing }}' and
Settings['optimize_trivial_insert_select'] = '{{ optimize_trivial_insert_select }}' and
Settings['max_insert_threads'] = '{{ max_insert_threads }}';
select count() from testX;
select count() from testXA;
select count() from testXB;
select count() from testXC;
{% endfor %}
{% endfor %}
{% endfor %}
-- { echoOff }
drop table testX;
drop view testXA;
drop view testXB;
drop view testXC;

View File

@ -0,0 +1,15 @@
-- https://github.com/ClickHouse/ClickHouse/issues/37900
drop table if exists t;
drop table if exists t_mv;
create table t (a UInt64) Engine = Null;
create materialized view t_mv Engine = Null AS select now() as ts, max(a) from t group by ts;
insert into t select * from numbers_mt(10e6) settings max_threads = 16, max_insert_threads=16;
system flush logs;
select arrayUniq(thread_ids)>=16 from system.query_log where
event_date >= yesterday() and
current_database = currentDatabase() and
type = 'QueryFinish' and
startsWith(query, 'insert');