diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 03b70dd8764..3194e753d0d 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2428,17 +2428,22 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// General limit for the number of threads. size_t max_threads_execute_query = settings.max_threads; - /** With distributed query processing, almost no computations are done in the threads, - * but wait and receive data from remote servers. - * If we have 20 remote servers, and max_threads = 8, then it would not be very good - * connect and ask only 8 servers at a time. - * To simultaneously query more remote servers, - * instead of max_threads, max_distributed_connections is used. + /** + * To simultaneously query more remote servers when async_socket_for_remote is off + * instead of max_threads, max_distributed_connections is used: + * since threads there mostly spend time waiting for data from remote servers, + * we can increase the degree of parallelism to avoid sequential querying of remote servers. + * + * DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used. + * + * That is not needed when async_socket_for_remote is on, because in that case + * threads are not blocked waiting for data from remote servers. + * */ - bool is_remote = false; - if (storage && storage->isRemote()) + bool is_sync_remote = false; + if (storage && storage->isRemote() && !settings.async_socket_for_remote) { - is_remote = true; + is_sync_remote = true; max_threads_execute_query = max_streams = settings.max_distributed_connections; } @@ -2494,7 +2499,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc max_streams = 1; /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads. - if (max_streams > 1 && !is_remote) + if (max_streams > 1 && !is_sync_remote) max_streams = static_cast(max_streams * settings.max_streams_to_max_threads_ratio); auto & prewhere_info = analysis_result.prewhere_info; @@ -2592,7 +2597,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// The inner local query (that is done in the same process, without /// network interaction), it will setMaxThreads earlier and distributed /// query will not update it. - if (!query_plan.getMaxThreads() || is_remote) + if (!query_plan.getMaxThreads() || is_sync_remote) query_plan.setMaxThreads(max_threads_execute_query); query_plan.setConcurrencyControl(settings.use_concurrency_control); diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 6f83414fc20..d4a0b7bdc7b 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -599,15 +599,20 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres size_t max_streams = settings.max_threads; size_t max_threads_execute_query = settings.max_threads; - /** With distributed query processing, almost no computations are done in the threads, - * but wait and receive data from remote servers. - * If we have 20 remote servers, and max_threads = 8, then it would not be efficient to - * connect and ask only 8 servers at a time. - * To simultaneously query more remote servers, - * instead of max_threads, max_distributed_connections is used. - */ - bool is_remote = table_expression_data.isRemote(); - if (is_remote) + /** + * To simultaneously query more remote servers when async_socket_for_remote is off + * instead of max_threads, max_distributed_connections is used: + * since threads there mostly spend time waiting for data from remote servers, + * we can increase the degree of parallelism to avoid sequential querying of remote servers. + * + * DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used. + * + * That is not needed when async_socket_for_remote is on, because in that case + * threads are not blocked waiting for data from remote servers. + * + */ + bool is_sync_remote = table_expression_data.isRemote() && !settings.async_socket_for_remote; + if (is_sync_remote) { max_streams = settings.max_distributed_connections; max_threads_execute_query = settings.max_distributed_connections; @@ -647,7 +652,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres max_streams = 1; /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads - if (max_streams > 1 && !is_remote) + if (max_streams > 1 && !is_sync_remote) max_streams = static_cast(max_streams * settings.max_streams_to_max_threads_ratio); if (table_node) @@ -841,7 +846,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres * network interaction), it will setMaxThreads earlier and distributed * query will not update it. */ - if (!query_plan.getMaxThreads() || is_remote) + if (!query_plan.getMaxThreads() || is_sync_remote) query_plan.setMaxThreads(max_threads_execute_query); query_plan.setConcurrencyControl(settings.use_concurrency_control); diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index e5394307f3b..de04f8adc4b 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -84,5 +84,6 @@ 02818_parameterized_view_with_cte_multiple_usage 01940_custom_tld_sharding_key 02815_range_dict_no_direct_join +02845_threads_count_in_distributed_queries 02861_join_on_nullsafe_compare 01019_alter_materialized_view_consistent diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference new file mode 100644 index 00000000000..c8338ebaf7c --- /dev/null +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.reference @@ -0,0 +1,15 @@ +prefer_localhost_replica=1, remote query with a lot of union all +77 +ok +prefer_localhost_replica=0, remote query with a lot of union all +77 +ok +prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads) +77 +ok +prepare test schema +95 +prefer_localhost_replica=1, remote query with read in order +ok +prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads) +ok diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 new file mode 100644 index 00000000000..ffdd4e3400e --- /dev/null +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -0,0 +1,229 @@ +-- enforce some defaults to be sure that the env settings will not affect the test +SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read'; + +-- we use query_thread_log to check peak thread usage +-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it +-- but that will not allow to backport the test to older versions +SET log_query_threads=1; + + +-------------------- +SELECT 'prefer_localhost_replica=1, remote query with a lot of union all' AS testname; + +-- query with lot of dummy union all will create a lot of streams +-- let's check how many threads clickhouse will start for that + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(77) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries1'; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries1' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-------------------- +SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname; + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(77) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries2', prefer_localhost_replica=0; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries2' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-------------------- +SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname; + +-- that is actually a bad behaviour, but it used to work like that for a long time. +-- now is happens only for async_socket_for_remote=0 (while it is 1 by default) +-- see https://github.com/ClickHouse/ClickHouse/issues/53287 + +select count() from remote('127.0.0.1:9000', view( +{% for n in range(77) -%} +SELECT * FROM system.one {{ "UNION ALL" if not loop.last }} +{% endfor -%} + )) SETTINGS log_comment='check_concurrency_in_remote_queries3', async_socket_for_remote=0, prefer_localhost_replica=1; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads >= 77, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries3' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +-- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287 +-- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature + +SELECT 'prepare test schema' AS stage; + +DROP TABLE IF EXISTS test_lot_of_parts_distributed; +DROP TABLE IF EXISTS test_lot_of_parts; + +CREATE TABLE test_lot_of_parts +( + `a` String, + `b` LowCardinality(String), + `c` DateTime64(3), + `val` String, +) +ENGINE = MergeTree +ORDER BY (a, b, c) +SETTINGS parts_to_delay_insert=0; + +CREATE TABLE test_lot_of_parts_distributed +( + `a` String, + `b` LowCardinality(String), + `c` DateTime64(3), + `val` String, +) +ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'test_lot_of_parts', rand()); + +-- we need a lot of parts to make sure that we will have a lot of streams +SYSTEM STOP MERGES test_lot_of_parts; +INSERT INTO test_lot_of_parts (a, b, c, val) + SELECT + 'foo' as a, + 'bar' as b, + _CAST('1683504000', 'DateTime64') as c, + 'baz' as val + FROM numbers_mt(95) + SETTINGS max_block_size = 1, min_insert_block_size_bytes=1, min_insert_block_size_rows=1; --every row will be in separate part + +select count() from system.parts where table = 'test_lot_of_parts' and active and database = currentDatabase(); + +SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname; + +-- query which uses optimize_read_in_order=1 +SELECT DISTINCT + 'val' AS fieldType, + val AS value +FROM test_lot_of_parts_distributed +WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64') +ORDER BY c DESC +LIMIT 5 +SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries4' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + + +SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname; + +-- that is actually a bad behaviour, but it used to work like that for a long time. +-- now is happens only for async_socket_for_remote=0 (while it is 1 by default) + +SELECT DISTINCT + 'val' AS fieldType, + val AS value +FROM test_lot_of_parts_distributed +WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64') +ORDER BY c DESC +LIMIT 5 +SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_remote=0 FORMAT Null; + +SYSTEM FLUSH LOGS; + +WITH + maxIntersections( + toUnixTimestamp64Micro(query_start_time_microseconds), + toUnixTimestamp64Micro(event_time_microseconds) + ) as peak_threads +SELECT + if(peak_threads >= 95, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result + -- we have 95 parts +FROM system.query_thread_log +WHERE + event_time > now() - 60 + AND query_id = ( + SELECT query_id + FROM system.query_log + WHERE + type = 'QueryFinish' + AND event_time > now() - 60 + AND log_comment = 'check_concurrency_in_remote_queries5' + AND current_database = currentDatabase() + ORDER BY event_time DESC LIMIT 1 + ); + +DROP TABLE IF EXISTS test_lot_of_parts_distributed; +DROP TABLE IF EXISTS test_lot_of_parts;