From 4ae7db83697bba58b0669ff3229c3c62cedae133 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 2 Jul 2022 17:59:02 +0300 Subject: [PATCH 1/4] Fix max_insert_threads while pushing to views Signed-off-by: Azat Khuzhin --- src/Interpreters/InterpreterInsertQuery.cpp | 5 ----- tests/performance/views_max_insert_threads.xml | 11 +++++++++++ .../02350_views_max_insert_threads.reference | 1 + .../02350_views_max_insert_threads.sql | 15 +++++++++++++++ 4 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 tests/performance/views_max_insert_threads.xml create mode 100644 tests/queries/0_stateless/02350_views_max_insert_threads.reference create mode 100644 tests/queries/0_stateless/02350_views_max_insert_threads.sql diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 4ed293e8530..f4394dc613d 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -457,17 +457,12 @@ BlockIO InterpreterInsertQuery::execute() }); size_t num_select_threads = pipeline.getNumThreads(); - size_t num_insert_threads = std::max_element(out_chains.begin(), out_chains.end(), [&](const auto &a, const auto &b) - { - return a.getNumThreads() < b.getNumThreads(); - })->getNumThreads(); for (auto & chain : out_chains) resources = chain.detachResources(); pipeline.addChains(std::move(out_chains)); - pipeline.setMaxThreads(num_insert_threads); /// Don't use more threads for insert then for select to reduce memory consumption. if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads) pipeline.setMaxThreads(num_select_threads); diff --git a/tests/performance/views_max_insert_threads.xml b/tests/performance/views_max_insert_threads.xml new file mode 100644 index 00000000000..2988984f5d8 --- /dev/null +++ b/tests/performance/views_max_insert_threads.xml @@ -0,0 +1,11 @@ + + + create table views_max_insert_threads_null (a UInt64) Engine = Null + create materialized view views_max_insert_threads_mv Engine = Null AS select now() as ts, max(a) from views_max_insert_threads_null group by ts + + insert into views_max_insert_threads_null select * from numbers_mt(3000000000) settings max_threads = 16, max_insert_threads=16 + + drop table if exists views_max_insert_threads_null + drop table if exists views_max_insert_threads_mv + + diff --git a/tests/queries/0_stateless/02350_views_max_insert_threads.reference b/tests/queries/0_stateless/02350_views_max_insert_threads.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/02350_views_max_insert_threads.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/02350_views_max_insert_threads.sql b/tests/queries/0_stateless/02350_views_max_insert_threads.sql new file mode 100644 index 00000000000..e19ad465b49 --- /dev/null +++ b/tests/queries/0_stateless/02350_views_max_insert_threads.sql @@ -0,0 +1,15 @@ +-- https://github.com/ClickHouse/ClickHouse/issues/37900 + +drop table if exists t; +drop table if exists t_mv; +create table t (a UInt64) Engine = Null; +create materialized view t_mv Engine = Null AS select now() as ts, max(a) from t group by ts; + +insert into t select * from numbers_mt(10e6) settings max_threads = 16, max_insert_threads=16; +system flush logs; + +select arrayUniq(thread_ids)>=16 from system.query_log where + event_date >= yesterday() and + current_database = currentDatabase() and + type = 'QueryFinish' and + startsWith(query, 'insert'); From dd3515da9805ce12bee79953936e6d5461424ad7 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 2 Jul 2022 20:02:21 +0300 Subject: [PATCH 2/4] Fix parallel_view_processing with optimize_trivial_insert_select=1 --- src/Interpreters/InterpreterInsertQuery.cpp | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index f4394dc613d..7b6066575ae 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -463,9 +463,20 @@ BlockIO InterpreterInsertQuery::execute() pipeline.addChains(std::move(out_chains)); - /// Don't use more threads for insert then for select to reduce memory consumption. - if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads) - pipeline.setMaxThreads(num_select_threads); + if (!settings.parallel_view_processing) + { + /// Don't use more threads for INSERT than for SELECT to reduce memory consumption. + if (pipeline.getNumThreads() > num_select_threads) + pipeline.setMaxThreads(num_select_threads); + } + else if (pipeline.getNumThreads() < settings.max_threads) + { + /// It is possible for query to have max_threads=1, due to optimize_trivial_insert_select, + /// however in case of parallel_view_processing and multiple views, views can still be processed in parallel. + /// + /// Note, number of threads will be limited by buildPushingToViewsChain() to max_threads. + pipeline.setMaxThreads(settings.max_threads); + } pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr { From 9225256dea808233feadc7e75265fbfeabfb0ffa Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 2 Jul 2022 19:49:41 +0300 Subject: [PATCH 3/4] tests: improve 01275_parallel_mv - cover optimize_trivial_insert_select=0/1 - cover max_insert_threads - convert parallel_view_processing into jinja2 Signed-off-by: Azat Khuzhin --- .../0_stateless/01275_parallel_mv.reference | 185 +++++++++++++++++- .../queries/0_stateless/01275_parallel_mv.sql | 39 ---- .../0_stateless/01275_parallel_mv.sql.j2 | 47 +++++ 3 files changed, 224 insertions(+), 47 deletions(-) delete mode 100644 tests/queries/0_stateless/01275_parallel_mv.sql create mode 100644 tests/queries/0_stateless/01275_parallel_mv.sql.j2 diff --git a/tests/queries/0_stateless/01275_parallel_mv.reference b/tests/queries/0_stateless/01275_parallel_mv.reference index 9021ae2bb1a..a9801e3b910 100644 --- a/tests/queries/0_stateless/01275_parallel_mv.reference +++ b/tests/queries/0_stateless/01275_parallel_mv.reference @@ -1,9 +1,23 @@ -- { echoOn } -set parallel_view_processing=1; -insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + + + +select 'optimize_trivial_insert_select=0', 'max_insert_threads=0'; +optimize_trivial_insert_select=0 max_insert_threads=0 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=0, + optimize_trivial_insert_select=0, + max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; -1 +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '0' and + Settings['optimize_trivial_insert_select'] = '0' and + Settings['max_insert_threads'] = '0'; +2 select count() from testX; 10 select count() from testXA; @@ -12,11 +26,22 @@ select count() from testXB; 0 select count() from testXC; 10 -set parallel_view_processing=0; -insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select 'optimize_trivial_insert_select=0', 'max_insert_threads=16'; +optimize_trivial_insert_select=0 max_insert_threads=16 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=0, + optimize_trivial_insert_select=0, + max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; -1 +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '0' and + Settings['optimize_trivial_insert_select'] = '0' and + Settings['max_insert_threads'] = '16'; +2 select count() from testX; 20 select count() from testXA; @@ -25,3 +50,147 @@ select count() from testXB; 0 select count() from testXC; 20 +select 'optimize_trivial_insert_select=1', 'max_insert_threads=0'; +optimize_trivial_insert_select=1 max_insert_threads=0 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=0, + optimize_trivial_insert_select=1, + max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '0' and + Settings['optimize_trivial_insert_select'] = '1' and + Settings['max_insert_threads'] = '0'; +2 +select count() from testX; +30 +select count() from testXA; +30 +select count() from testXB; +0 +select count() from testXC; +30 +select 'optimize_trivial_insert_select=1', 'max_insert_threads=16'; +optimize_trivial_insert_select=1 max_insert_threads=16 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=0, + optimize_trivial_insert_select=1, + max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '0' and + Settings['optimize_trivial_insert_select'] = '1' and + Settings['max_insert_threads'] = '16'; +2 +select count() from testX; +40 +select count() from testXA; +40 +select count() from testXB; +0 +select count() from testXC; +40 +select 'optimize_trivial_insert_select=0', 'max_insert_threads=0'; +optimize_trivial_insert_select=0 max_insert_threads=0 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=1, + optimize_trivial_insert_select=0, + max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '1' and + Settings['optimize_trivial_insert_select'] = '0' and + Settings['max_insert_threads'] = '0'; +5 +select count() from testX; +50 +select count() from testXA; +50 +select count() from testXB; +0 +select count() from testXC; +50 +select 'optimize_trivial_insert_select=0', 'max_insert_threads=16'; +optimize_trivial_insert_select=0 max_insert_threads=16 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=1, + optimize_trivial_insert_select=0, + max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '1' and + Settings['optimize_trivial_insert_select'] = '0' and + Settings['max_insert_threads'] = '16'; +5 +select count() from testX; +60 +select count() from testXA; +60 +select count() from testXB; +0 +select count() from testXC; +60 +select 'optimize_trivial_insert_select=1', 'max_insert_threads=0'; +optimize_trivial_insert_select=1 max_insert_threads=0 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=1, + optimize_trivial_insert_select=1, + max_insert_threads=0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '1' and + Settings['optimize_trivial_insert_select'] = '1' and + Settings['max_insert_threads'] = '0'; +5 +select count() from testX; +70 +select count() from testXA; +70 +select count() from testXB; +0 +select count() from testXC; +70 +select 'optimize_trivial_insert_select=1', 'max_insert_threads=16'; +optimize_trivial_insert_select=1 max_insert_threads=16 +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing=1, + optimize_trivial_insert_select=1, + max_insert_threads=16; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '1' and + Settings['optimize_trivial_insert_select'] = '1' and + Settings['max_insert_threads'] = '16'; +5 +select count() from testX; +80 +select count() from testXA; +80 +select count() from testXB; +0 +select count() from testXC; +80 diff --git a/tests/queries/0_stateless/01275_parallel_mv.sql b/tests/queries/0_stateless/01275_parallel_mv.sql deleted file mode 100644 index 27b8ef96e0b..00000000000 --- a/tests/queries/0_stateless/01275_parallel_mv.sql +++ /dev/null @@ -1,39 +0,0 @@ -set max_threads = 0; - -drop table if exists testX; -drop table if exists testXA; -drop table if exists testXB; -drop table if exists testXC; - -create table testX (A Int64) engine=MergeTree order by tuple(); - -create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX; -create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX; -create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX; - --- { echoOn } -set parallel_view_processing=1; -insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } -system flush logs; -select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; - -select count() from testX; -select count() from testXA; -select count() from testXB; -select count() from testXC; - -set parallel_view_processing=0; -insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } -system flush logs; -select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; - -select count() from testX; -select count() from testXA; -select count() from testXB; -select count() from testXC; --- { echoOff } - -drop table testX; -drop view testXA; -drop view testXB; -drop view testXC; diff --git a/tests/queries/0_stateless/01275_parallel_mv.sql.j2 b/tests/queries/0_stateless/01275_parallel_mv.sql.j2 new file mode 100644 index 00000000000..3b7c414a44e --- /dev/null +++ b/tests/queries/0_stateless/01275_parallel_mv.sql.j2 @@ -0,0 +1,47 @@ +-- avoid settings randomization by clickhouse-test +set max_threads = 0; + +drop table if exists testX; +drop table if exists testXA; +drop table if exists testXB; +drop table if exists testXC; + +create table testX (A Int64) engine=MergeTree order by tuple(); + +create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX; +create materialized view testXB engine=MergeTree order by tuple() as select sleep(0.2), throwIf(A=1) from testX; +create materialized view testXC engine=MergeTree order by tuple() as select sleep(0.1) from testX; + +-- { echoOn } +{% for parallel_view_processing in [0, 1] %} +{% for optimize_trivial_insert_select in [0, 1] %} +{% for max_insert_threads in [0, 16] %} +select 'optimize_trivial_insert_select={{ optimize_trivial_insert_select }}', 'max_insert_threads={{ max_insert_threads }}'; + +insert into testX select number from numbers(10) settings + log_queries=1, + parallel_view_processing={{ parallel_view_processing }}, + optimize_trivial_insert_select={{ optimize_trivial_insert_select }}, + max_insert_threads={{ max_insert_threads }}; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +system flush logs; +select arrayUniq(thread_ids) from system.query_log where + current_database = currentDatabase() and + type != 'QueryStart' and + query like '%insert into testX %' and + Settings['parallel_view_processing'] = '{{ parallel_view_processing }}' and + Settings['optimize_trivial_insert_select'] = '{{ optimize_trivial_insert_select }}' and + Settings['max_insert_threads'] = '{{ max_insert_threads }}'; + +select count() from testX; +select count() from testXA; +select count() from testXB; +select count() from testXC; +{% endfor %} +{% endfor %} +{% endfor %} +-- { echoOff } + +drop table testX; +drop view testXA; +drop view testXB; +drop view testXC; From 7427adb600633008af5688e394eb287f370e1d41 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 3 Jul 2022 09:24:09 +0300 Subject: [PATCH 4/4] tests: disable 01275_parallel_mv under S3 (since it has thread pool for writes) Signed-off-by: Azat Khuzhin --- tests/queries/0_stateless/01275_parallel_mv.sql.j2 | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/queries/0_stateless/01275_parallel_mv.sql.j2 b/tests/queries/0_stateless/01275_parallel_mv.sql.j2 index 3b7c414a44e..6b17a141d3e 100644 --- a/tests/queries/0_stateless/01275_parallel_mv.sql.j2 +++ b/tests/queries/0_stateless/01275_parallel_mv.sql.j2 @@ -1,3 +1,6 @@ +-- Tags: no-s3-storage +-- no-s3-storage: s3 has 20 more threads + -- avoid settings randomization by clickhouse-test set max_threads = 0;