From bff832c7f0e47a2bd84da7b424a33d4b5a5ee3cf Mon Sep 17 00:00:00 2001 From: filimonov <1549571+filimonov@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:19:20 +0200 Subject: [PATCH 1/7] Disable logic max_threads=max_distributed_connections when async_socket_for_remote=1 See #53287 --- src/Interpreters/InterpreterSelectQuery.cpp | 27 +- src/Planner/PlannerJoinTree.cpp | 27 +- tests/analyzer_tech_debt.txt | 1 + ...ads_count_in_distributed_queries.reference | 15 ++ ...hreads_count_in_distributed_queries.sql.j2 | 238 ++++++++++++++++++ 5 files changed, 286 insertions(+), 22 deletions(-) create mode 100644 tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference create mode 100644 tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e1faa8c8958..6037e205eca 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2328,17 +2328,22 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// General limit for the number of threads. size_t max_threads_execute_query = settings.max_threads; - /** With distributed query processing, almost no computations are done in the threads, - * but wait and receive data from remote servers. - * If we have 20 remote servers, and max_threads = 8, then it would not be very good - * connect and ask only 8 servers at a time. - * To simultaneously query more remote servers, - * instead of max_threads, max_distributed_connections is used. + /** + * To simultaneously query more remote servers when async_socket_for_remote is off + * instead of max_threads, max_distributed_connections is used: + * since threads there mostly spend time waiting for data from remote servers, + * we can increase the degree of parallelism to avoid sequential querying of remote servers. + * + * DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used. + * + * That is not needed when async_socket_for_remote is on, because in that case + * threads are not blocked waiting for data from remote servers. + * */ - bool is_remote = false; - if (storage && storage->isRemote()) + bool is_sync_remote = false; + if (storage && storage->isRemote() && !settings.async_socket_for_remote) { - is_remote = true; + is_sync_remote = true; max_threads_execute_query = max_streams = settings.max_distributed_connections; } @@ -2415,7 +2420,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc max_streams = 1; /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads. - if (max_streams > 1 && !is_remote) + if (max_streams > 1 && !is_sync_remote) max_streams = static_cast(max_streams * settings.max_streams_to_max_threads_ratio); auto & prewhere_info = analysis_result.prewhere_info; @@ -2513,7 +2518,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// The inner local query (that is done in the same process, without /// network interaction), it will setMaxThreads earlier and distributed /// query will not update it. - if (!query_plan.getMaxThreads() || is_remote) + if (!query_plan.getMaxThreads() || is_sync_remote) query_plan.setMaxThreads(max_threads_execute_query); query_plan.setConcurrencyControl(settings.use_concurrency_control); diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index c95671da6be..d70f901164f 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -557,15 +557,20 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres size_t max_streams = settings.max_threads; size_t max_threads_execute_query = settings.max_threads; - /** With distributed query processing, almost no computations are done in the threads, - * but wait and receive data from remote servers. - * If we have 20 remote servers, and max_threads = 8, then it would not be efficient to - * connect and ask only 8 servers at a time. - * To simultaneously query more remote servers, - * instead of max_threads, max_distributed_connections is used. - */ - bool is_remote = table_expression_data.isRemote(); - if (is_remote) + /** + * To simultaneously query more remote servers when async_socket_for_remote is off + * instead of max_threads, max_distributed_connections is used: + * since threads there mostly spend time waiting for data from remote servers, + * we can increase the degree of parallelism to avoid sequential querying of remote servers. + * + * DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used. + * + * That is not needed when async_socket_for_remote is on, because in that case + * threads are not blocked waiting for data from remote servers. + * + */ + bool is_sync_remote = table_expression_data.isRemote() && !settings.async_socket_for_remote; + if (is_sync_remote) { max_streams = settings.max_distributed_connections; max_threads_execute_query = settings.max_distributed_connections; @@ -632,7 +637,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres max_streams = 1; /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads - if (max_streams > 1 && !is_remote) + if (max_streams > 1 && !is_sync_remote) max_streams = static_cast(max_streams * settings.max_streams_to_max_threads_ratio); if (table_node) @@ -792,7 +797,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres * network interaction), it will setMaxThreads earlier and distributed * query will not update it. */ - if (!query_plan.getMaxThreads() || is_remote) + if (!query_plan.getMaxThreads() || is_sync_remote) query_plan.setMaxThreads(max_threads_execute_query); query_plan.setConcurrencyControl(settings.use_concurrency_control); diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index 652ab0b99de..352bb0a0c94 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -128,4 +128,5 @@ 02790_optimize_skip_unused_shards_join 01940_custom_tld_sharding_key 02815_range_dict_no_direct_join +02845_threads_count_in_distributed_queries 02861_join_on_nullsafe_compare diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference new file mode 100644 index 00000000000..0e3f6336468 --- /dev/null +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference @@ -0,0 +1,15 @@ +prefer_localhost_replica=1, remote query with a lot of union all +379 +ok +prefer_localhost_replica=0, remote query with a lot of union all +379 +ok +prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads) +379 +ok +prepare test schema +500 +prefer_localhost_replica=1, remote query with read in order +ok +prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads) +ok diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 new file mode 100644 index 00000000000..6039e12071d --- /dev/null +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -0,0 +1,238 @@ +-- enforce some defaults to be sure that the env settings will not affect the test +SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1; + + +-- we use query_thread_log to check peak thread usage +-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it +-- but that will not allow to backport the test to older versions +SET log_query_threads=1; + + +-------------------- +SELECT 'prefer_localhost_replica=1, remote query with a lot of union all' AS testname; + +-- query with lot of dummy union all will create a lot of streams +-- let's check how many threads clickhouse will start for that + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(379) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries1'; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries1' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-------------------- +SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname; + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(379) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries2', prefer_localhost_replica=0; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries2' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-------------------- +SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname; + +-- that is actually a bad behaviour, but it used to work like that for a long time. +-- now is happens only for async_socket_for_remote=0 (while it is 1 by default) +-- see https://github.com/ClickHouse/ClickHouse/issues/53287 + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(379) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries3', async_socket_for_remote=0, prefer_localhost_replica=1; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads >= 379, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries3' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287 +-- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature + +SELECT 'prepare test schema' AS stage; + +DROP TABLE IF EXISTS test_lot_of_parts_distributed; +DROP TABLE IF EXISTS test_lot_of_parts; + +CREATE TABLE test_lot_of_parts +( + `a` String, + `b` LowCardinality(String), + `c` DateTime64(3), + `val` String, +) +ENGINE = MergeTree +ORDER BY (a, b, c) +SETTINGS parts_to_delay_insert=0; + +CREATE TABLE test_lot_of_parts_distributed +( + `a` String, + `b` LowCardinality(String), + `c` DateTime64(3), + `val` String, +) +ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'test_lot_of_parts', rand()); + +-- we need a lot of parts to make sure that we will have a lot of streams +SYSTEM STOP MERGES test_lot_of_parts; +INSERT INTO test_lot_of_parts (a, b, c, val) + SELECT + 'foo' as a, + 'bar' as b, + _CAST('1683504000', 'DateTime64') as c, + 'baz' as val + FROM numbers_mt(500) + SETTINGS max_block_size = 1, min_insert_block_size_bytes=1, min_insert_block_size_rows=1; --every row will be in separate part + +select count() from system.parts where table = 'test_lot_of_parts' and active and database = currentDatabase(); + +SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname; + +--- it seem like the number of threads here reported here is still higher than expected +--- max_threads=1 => peak_threads=7 +--- max_threads=2 => peak_threads=14 +--- max_threads=5 => peak_threads=25 +--- may be it's a matter of the way how we count threads in thread_log, or there is another problem here +--- anyway it still much better than before (see the next test in that file - it uses 500 threads). + +-- query which uses optimize_read_in_order=1 +SELECT DISTINCT + 'val' AS fieldType, + val AS value +FROM test_lot_of_parts_distributed +WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64') +ORDER BY c DESC +LIMIT 5 +SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + -- it should be max_threads+2 not max_threads*8 + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) * 8, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries4' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + + +SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname; + +-- that is actually a bad behaviour, but it used to work like that for a long time. +-- now is happens only for async_socket_for_remote=0 (while it is 1 by default) + +SELECT DISTINCT + 'val' AS fieldType, + val AS value +FROM test_lot_of_parts_distributed +WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64') +ORDER BY c DESC +LIMIT 5 +SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_remote=0 FORMAT Null; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads >= 500, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result + -- we have 500 parts +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries5' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +DROP TABLE IF EXISTS test_lot_of_parts_distributed; +DROP TABLE IF EXISTS test_lot_of_parts; From bf1aa653d1711920329ac787a4e69ea6d3f950a0 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Wed, 27 Sep 2023 14:49:23 +0200 Subject: [PATCH 2/7] put limit on number of threads back, measure them via peak_threads_usage --- ...hreads_count_in_distributed_queries.sql.j2 | 137 +++++------------- 1 file changed, 36 insertions(+), 101 deletions(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index 6039e12071d..5be37bbacf6 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,5 +1,5 @@ -- enforce some defaults to be sure that the env settings will not affect the test -SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1; +SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1, prefer_localhost_replica=1; -- we use query_thread_log to check peak thread usage @@ -22,26 +22,15 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; -WITH - maxIntersections( - toUnixTimestamp64Micro(query_start_time_microseconds), - toUnixTimestamp64Micro(event_time_microseconds) - ) as peak_threads SELECT - if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result -FROM system.query_thread_log + if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result +FROM system.query_log WHERE - event_time > now() - 60 - AND query_id = ( - SELECT query_id - FROM system.query_log - WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries1' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1 - ); + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries1' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1; -------------------- SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname; @@ -54,26 +43,15 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; -WITH - maxIntersections( - toUnixTimestamp64Micro(query_start_time_microseconds), - toUnixTimestamp64Micro(event_time_microseconds) - ) as peak_threads SELECT - if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result -FROM system.query_thread_log + if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result +FROM system.query_log WHERE - event_time > now() - 60 - AND query_id = ( - SELECT query_id - FROM system.query_log - WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries2' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1 - ); + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries2' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1; -------------------- SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname; @@ -90,26 +68,15 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; -WITH - maxIntersections( - toUnixTimestamp64Micro(query_start_time_microseconds), - toUnixTimestamp64Micro(event_time_microseconds) - ) as peak_threads SELECT - if(peak_threads >= 379, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result -FROM system.query_thread_log + if(peak_threads_usage >= 379, 'ok', 'too few threads: ' || toString(peak_threads_usage) ) AS result +FROM system.query_log WHERE - event_time > now() - 60 - AND query_id = ( - SELECT query_id - FROM system.query_log - WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries3' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1 - ); + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries3' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1; -- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287 -- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature @@ -154,13 +121,6 @@ select count() from system.parts where table = 'test_lot_of_parts' and active an SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname; ---- it seem like the number of threads here reported here is still higher than expected ---- max_threads=1 => peak_threads=7 ---- max_threads=2 => peak_threads=14 ---- max_threads=5 => peak_threads=25 ---- may be it's a matter of the way how we count threads in thread_log, or there is another problem here ---- anyway it still much better than before (see the next test in that file - it uses 500 threads). - -- query which uses optimize_read_in_order=1 SELECT DISTINCT 'val' AS fieldType, @@ -173,28 +133,15 @@ SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null; SYSTEM FLUSH LOGS; -WITH - maxIntersections( - toUnixTimestamp64Micro(query_start_time_microseconds), - toUnixTimestamp64Micro(event_time_microseconds) - ) as peak_threads SELECT - -- it should be max_threads+2 not max_threads*8 - if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) * 8, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result -FROM system.query_thread_log + if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result +FROM system.query_log WHERE - event_time > now() - 60 - AND query_id = ( - SELECT query_id - FROM system.query_log - WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries4' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1 - ); - + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries4' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1; SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname; @@ -212,27 +159,15 @@ SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_re SYSTEM FLUSH LOGS; -WITH - maxIntersections( - toUnixTimestamp64Micro(query_start_time_microseconds), - toUnixTimestamp64Micro(event_time_microseconds) - ) as peak_threads SELECT - if(peak_threads >= 500, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result - -- we have 500 parts -FROM system.query_thread_log + if(peak_threads_usage >= 500, 'ok', 'too few threads: ' || toString(peak_threads_usage) ) AS result +FROM system.query_log WHERE - event_time > now() - 60 - AND query_id = ( - SELECT query_id - FROM system.query_log - WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries5' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1 - ); + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries5' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1; DROP TABLE IF EXISTS test_lot_of_parts_distributed; DROP TABLE IF EXISTS test_lot_of_parts; From e699b4e93772f7a1cac50d32502726db7095a690 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 29 Sep 2023 11:07:25 +0200 Subject: [PATCH 3/7] Revert "put limit on number of threads back, measure them via peak_threads_usage" This reverts commit 4da2d7ca879749bb2930c0936cd33156c9c62ee0. --- ...hreads_count_in_distributed_queries.sql.j2 | 137 +++++++++++++----- 1 file changed, 101 insertions(+), 36 deletions(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index 5be37bbacf6..6039e12071d 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,5 +1,5 @@ -- enforce some defaults to be sure that the env settings will not affect the test -SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1, prefer_localhost_replica=1; +SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1; -- we use query_thread_log to check peak thread usage @@ -22,15 +22,26 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads SELECT - if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result -FROM system.query_log + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries1' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1; + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries1' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); -------------------- SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname; @@ -43,15 +54,26 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads SELECT - if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result -FROM system.query_log + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries2' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1; + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries2' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); -------------------- SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname; @@ -68,15 +90,26 @@ SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} SYSTEM FLUSH LOGS; +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads SELECT - if(peak_threads_usage >= 379, 'ok', 'too few threads: ' || toString(peak_threads_usage) ) AS result -FROM system.query_log + if(peak_threads >= 379, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries3' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1; + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries3' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); -- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287 -- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature @@ -121,6 +154,13 @@ select count() from system.parts where table = 'test_lot_of_parts' and active an SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname; +--- it seem like the number of threads here reported here is still higher than expected +--- max_threads=1 => peak_threads=7 +--- max_threads=2 => peak_threads=14 +--- max_threads=5 => peak_threads=25 +--- may be it's a matter of the way how we count threads in thread_log, or there is another problem here +--- anyway it still much better than before (see the next test in that file - it uses 500 threads). + -- query which uses optimize_read_in_order=1 SELECT DISTINCT 'val' AS fieldType, @@ -133,15 +173,28 @@ SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null; SYSTEM FLUSH LOGS; +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads SELECT - if(peak_threads_usage BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads_usage) ) AS result -FROM system.query_log + -- it should be max_threads+2 not max_threads*8 + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) * 8, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries4' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1; + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries4' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname; @@ -159,15 +212,27 @@ SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_re SYSTEM FLUSH LOGS; +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads SELECT - if(peak_threads_usage >= 500, 'ok', 'too few threads: ' || toString(peak_threads_usage) ) AS result -FROM system.query_log + if(peak_threads >= 500, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result + -- we have 500 parts +FROM system.query_thread_log WHERE - type = 'QueryFinish' - AND event_time > now() - 60 - AND log_comment = 'check_concurrency_in_remote_queries5' - AND current_database = currentDatabase() - ORDER BY event_time DESC LIMIT 1; + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries5' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); DROP TABLE IF EXISTS test_lot_of_parts_distributed; DROP TABLE IF EXISTS test_lot_of_parts; From d8cddeaf907f669221972c44707bc49871c3dd8a Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 29 Sep 2023 17:31:35 +0200 Subject: [PATCH 4/7] Trying proposal by Azat --- ...02845_threads_count_in_distributed_queries.sql.j2 | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index 6039e12071d..fe5744eccf1 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,5 +1,5 @@ -- enforce some defaults to be sure that the env settings will not affect the test -SET max_threads=5, async_socket_for_remote=1, async_socket_for_remote=1, optimize_read_in_order=1; +SET max_threads=5, async_socket_for_remote=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread'; -- we use query_thread_log to check peak thread usage @@ -154,13 +154,6 @@ select count() from system.parts where table = 'test_lot_of_parts' and active an SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname; ---- it seem like the number of threads here reported here is still higher than expected ---- max_threads=1 => peak_threads=7 ---- max_threads=2 => peak_threads=14 ---- max_threads=5 => peak_threads=25 ---- may be it's a matter of the way how we count threads in thread_log, or there is another problem here ---- anyway it still much better than before (see the next test in that file - it uses 500 threads). - -- query which uses optimize_read_in_order=1 SELECT DISTINCT 'val' AS fieldType, @@ -179,8 +172,7 @@ WITH toUnixTimestamp64Micro(event_time_microseconds) ) as peak_threads SELECT - -- it should be max_threads+2 not max_threads*8 - if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) * 8, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result FROM system.query_thread_log WHERE event_time > now() - 60 From bbba89977803200b83d7f35637304be886311b4d Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Mon, 2 Oct 2023 12:44:54 +0200 Subject: [PATCH 5/7] missed setting --- .../02845_threads_count_in_distributed_queries.sql.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index fe5744eccf1..cd714fdc091 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,5 +1,5 @@ -- enforce some defaults to be sure that the env settings will not affect the test -SET max_threads=5, async_socket_for_remote=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread'; +SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread'; -- we use query_thread_log to check peak thread usage From 1abe0013c0b2042c79b0dcd75c18ef2364f8aecf Mon Sep 17 00:00:00 2001 From: filimonov <1549571+filimonov@users.noreply.github.com> Date: Mon, 2 Oct 2023 22:49:23 +0200 Subject: [PATCH 6/7] Update 02845_threads_count_in_distributed_queries.sql.j2 --- .../02845_threads_count_in_distributed_queries.sql.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index cd714fdc091..eb3fa6fadd7 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,5 +1,5 @@ -- enforce some defaults to be sure that the env settings will not affect the test -SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread'; +SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read'; -- we use query_thread_log to check peak thread usage From aa2bf5ac02ff079a8367959398bead42d411037f Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 3 Oct 2023 10:03:17 +0200 Subject: [PATCH 7/7] make test faster --- ...threads_count_in_distributed_queries.reference | 8 ++++---- ...45_threads_count_in_distributed_queries.sql.j2 | 15 +++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference index 0e3f6336468..c8338ebaf7c 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference @@ -1,14 +1,14 @@ prefer_localhost_replica=1, remote query with a lot of union all -379 +77 ok prefer_localhost_replica=0, remote query with a lot of union all -379 +77 ok prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads) -379 +77 ok prepare test schema -500 +95 prefer_localhost_replica=1, remote query with read in order ok prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads) diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index eb3fa6fadd7..ffdd4e3400e 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -1,7 +1,6 @@ -- enforce some defaults to be sure that the env settings will not affect the test SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read'; - -- we use query_thread_log to check peak thread usage -- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it -- but that will not allow to backport the test to older versions @@ -15,7 +14,7 @@ SELECT 'prefer_localhost_replica=1, remote query with a lot of union all' AS tes -- let's check how many threads clickhouse will start for that select count() from remote('127.0.0.1:9000', view( -{% for n in range(379) -%} +{% for n in range(77) -%} SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} {% endfor -%} )) SETTINGS log_comment='check_concurrency_in_remote_queries1'; @@ -47,7 +46,7 @@ WHERE SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname; select count() from remote('127.0.0.1:9000', view( -{% for n in range(379) -%} +{% for n in range(77) -%} SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} {% endfor -%} )) SETTINGS log_comment='check_concurrency_in_remote_queries2', prefer_localhost_replica=0; @@ -83,7 +82,7 @@ SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with -- see https://github.com/ClickHouse/ClickHouse/issues/53287 select count() from remote('127.0.0.1:9000', view( -{% for n in range(379) -%} +{% for n in range(77) -%} SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} {% endfor -%} )) SETTINGS log_comment='check_concurrency_in_remote_queries3', async_socket_for_remote=0, prefer_localhost_replica=1; @@ -96,7 +95,7 @@ WITH toUnixTimestamp64Micro(event_time_microseconds) ) as peak_threads SELECT - if(peak_threads >= 379, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result + if(peak_threads >= 77, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result FROM system.query_thread_log WHERE event_time > now() - 60 @@ -147,7 +146,7 @@ INSERT INTO test_lot_of_parts (a, b, c, val) 'bar' as b, _CAST('1683504000', 'DateTime64') as c, 'baz' as val - FROM numbers_mt(500) + FROM numbers_mt(95) SETTINGS max_block_size = 1, min_insert_block_size_bytes=1, min_insert_block_size_rows=1; --every row will be in separate part select count() from system.parts where table = 'test_lot_of_parts' and active and database = currentDatabase(); @@ -210,8 +209,8 @@ WITH toUnixTimestamp64Micro(event_time_microseconds) ) as peak_threads SELECT - if(peak_threads >= 500, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result - -- we have 500 parts + if(peak_threads >= 95, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result + -- we have 95 parts FROM system.query_thread_log WHERE event_time > now() - 60