Merge pull request #53504 from filimonov/ignore_max_distributed_connections_for_async_remote

Disable logic max_threads=max_distributed_connections when async_socket_for_remote=1
This commit is contained in:
Kruglov Pavel 2023-10-24 14:15:21 +02:00 committed by GitHub
commit 62b154930a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 277 additions and 22 deletions

View File

@ -2428,17 +2428,22 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// General limit for the number of threads.
size_t max_threads_execute_query = settings.max_threads;
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers.
* If we have 20 remote servers, and max_threads = 8, then it would not be very good
* connect and ask only 8 servers at a time.
* To simultaneously query more remote servers,
* instead of max_threads, max_distributed_connections is used.
/**
* To simultaneously query more remote servers when async_socket_for_remote is off
* instead of max_threads, max_distributed_connections is used:
* since threads there mostly spend time waiting for data from remote servers,
* we can increase the degree of parallelism to avoid sequential querying of remote servers.
*
* DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used.
*
* That is not needed when async_socket_for_remote is on, because in that case
* threads are not blocked waiting for data from remote servers.
*
*/
bool is_remote = false;
if (storage && storage->isRemote())
bool is_sync_remote = false;
if (storage && storage->isRemote() && !settings.async_socket_for_remote)
{
is_remote = true;
is_sync_remote = true;
max_threads_execute_query = max_streams = settings.max_distributed_connections;
}
@ -2494,7 +2499,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
max_streams = 1;
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
if (max_streams > 1 && !is_remote)
if (max_streams > 1 && !is_sync_remote)
max_streams = static_cast<size_t>(max_streams * settings.max_streams_to_max_threads_ratio);
auto & prewhere_info = analysis_result.prewhere_info;
@ -2592,7 +2597,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// The inner local query (that is done in the same process, without
/// network interaction), it will setMaxThreads earlier and distributed
/// query will not update it.
if (!query_plan.getMaxThreads() || is_remote)
if (!query_plan.getMaxThreads() || is_sync_remote)
query_plan.setMaxThreads(max_threads_execute_query);
query_plan.setConcurrencyControl(settings.use_concurrency_control);

View File

@ -599,15 +599,20 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
size_t max_streams = settings.max_threads;
size_t max_threads_execute_query = settings.max_threads;
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers.
* If we have 20 remote servers, and max_threads = 8, then it would not be efficient to
* connect and ask only 8 servers at a time.
* To simultaneously query more remote servers,
* instead of max_threads, max_distributed_connections is used.
*/
bool is_remote = table_expression_data.isRemote();
if (is_remote)
/**
* To simultaneously query more remote servers when async_socket_for_remote is off
* instead of max_threads, max_distributed_connections is used:
* since threads there mostly spend time waiting for data from remote servers,
* we can increase the degree of parallelism to avoid sequential querying of remote servers.
*
* DANGER: that can lead to insane number of threads working if there are a lot of stream and prefer_localhost_replica is used.
*
* That is not needed when async_socket_for_remote is on, because in that case
* threads are not blocked waiting for data from remote servers.
*
*/
bool is_sync_remote = table_expression_data.isRemote() && !settings.async_socket_for_remote;
if (is_sync_remote)
{
max_streams = settings.max_distributed_connections;
max_threads_execute_query = settings.max_distributed_connections;
@ -647,7 +652,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
max_streams = 1;
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads
if (max_streams > 1 && !is_remote)
if (max_streams > 1 && !is_sync_remote)
max_streams = static_cast<size_t>(max_streams * settings.max_streams_to_max_threads_ratio);
if (table_node)
@ -841,7 +846,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
* network interaction), it will setMaxThreads earlier and distributed
* query will not update it.
*/
if (!query_plan.getMaxThreads() || is_remote)
if (!query_plan.getMaxThreads() || is_sync_remote)
query_plan.setMaxThreads(max_threads_execute_query);
query_plan.setConcurrencyControl(settings.use_concurrency_control);

View File

@ -84,5 +84,6 @@
02818_parameterized_view_with_cte_multiple_usage
01940_custom_tld_sharding_key
02815_range_dict_no_direct_join
02845_threads_count_in_distributed_queries
02861_join_on_nullsafe_compare
01019_alter_materialized_view_consistent

View File

@ -0,0 +1,15 @@
prefer_localhost_replica=1, remote query with a lot of union all
77
ok
prefer_localhost_replica=0, remote query with a lot of union all
77
ok
prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)
77
ok
prepare test schema
95
prefer_localhost_replica=1, remote query with read in order
ok
prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)
ok

View File

@ -0,0 +1,229 @@
-- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read';
-- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions
SET log_query_threads=1;
--------------------
SELECT 'prefer_localhost_replica=1, remote query with a lot of union all' AS testname;
-- query with lot of dummy union all will create a lot of streams
-- let's check how many threads clickhouse will start for that
select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
)) SETTINGS log_comment='check_concurrency_in_remote_queries1';
SYSTEM FLUSH LOGS;
WITH
maxIntersections(
toUnixTimestamp64Micro(query_start_time_microseconds),
toUnixTimestamp64Micro(event_time_microseconds)
) as peak_threads
SELECT
if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
event_time > now() - 60
AND query_id = (
SELECT query_id
FROM system.query_log
WHERE
type = 'QueryFinish'
AND event_time > now() - 60
AND log_comment = 'check_concurrency_in_remote_queries1'
AND current_database = currentDatabase()
ORDER BY event_time DESC LIMIT 1
);
--------------------
SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname;
select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
)) SETTINGS log_comment='check_concurrency_in_remote_queries2', prefer_localhost_replica=0;
SYSTEM FLUSH LOGS;
WITH
maxIntersections(
toUnixTimestamp64Micro(query_start_time_microseconds),
toUnixTimestamp64Micro(event_time_microseconds)
) as peak_threads
SELECT
if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
event_time > now() - 60
AND query_id = (
SELECT query_id
FROM system.query_log
WHERE
type = 'QueryFinish'
AND event_time > now() - 60
AND log_comment = 'check_concurrency_in_remote_queries2'
AND current_database = currentDatabase()
ORDER BY event_time DESC LIMIT 1
);
--------------------
SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname;
-- that is actually a bad behaviour, but it used to work like that for a long time.
-- now is happens only for async_socket_for_remote=0 (while it is 1 by default)
-- see https://github.com/ClickHouse/ClickHouse/issues/53287
select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
)) SETTINGS log_comment='check_concurrency_in_remote_queries3', async_socket_for_remote=0, prefer_localhost_replica=1;
SYSTEM FLUSH LOGS;
WITH
maxIntersections(
toUnixTimestamp64Micro(query_start_time_microseconds),
toUnixTimestamp64Micro(event_time_microseconds)
) as peak_threads
SELECT
if(peak_threads >= 77, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
event_time > now() - 60
AND query_id = (
SELECT query_id
FROM system.query_log
WHERE
type = 'QueryFinish'
AND event_time > now() - 60
AND log_comment = 'check_concurrency_in_remote_queries3'
AND current_database = currentDatabase()
ORDER BY event_time DESC LIMIT 1
);
-- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287
-- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature
SELECT 'prepare test schema' AS stage;
DROP TABLE IF EXISTS test_lot_of_parts_distributed;
DROP TABLE IF EXISTS test_lot_of_parts;
CREATE TABLE test_lot_of_parts
(
`a` String,
`b` LowCardinality(String),
`c` DateTime64(3),
`val` String,
)
ENGINE = MergeTree
ORDER BY (a, b, c)
SETTINGS parts_to_delay_insert=0;
CREATE TABLE test_lot_of_parts_distributed
(
`a` String,
`b` LowCardinality(String),
`c` DateTime64(3),
`val` String,
)
ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'test_lot_of_parts', rand());
-- we need a lot of parts to make sure that we will have a lot of streams
SYSTEM STOP MERGES test_lot_of_parts;
INSERT INTO test_lot_of_parts (a, b, c, val)
SELECT
'foo' as a,
'bar' as b,
_CAST('1683504000', 'DateTime64') as c,
'baz' as val
FROM numbers_mt(95)
SETTINGS max_block_size = 1, min_insert_block_size_bytes=1, min_insert_block_size_rows=1; --every row will be in separate part
select count() from system.parts where table = 'test_lot_of_parts' and active and database = currentDatabase();
SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname;
-- query which uses optimize_read_in_order=1
SELECT DISTINCT
'val' AS fieldType,
val AS value
FROM test_lot_of_parts_distributed
WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64')
ORDER BY c DESC
LIMIT 5
SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null;
SYSTEM FLUSH LOGS;
WITH
maxIntersections(
toUnixTimestamp64Micro(query_start_time_microseconds),
toUnixTimestamp64Micro(event_time_microseconds)
) as peak_threads
SELECT
if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
event_time > now() - 60
AND query_id = (
SELECT query_id
FROM system.query_log
WHERE
type = 'QueryFinish'
AND event_time > now() - 60
AND log_comment = 'check_concurrency_in_remote_queries4'
AND current_database = currentDatabase()
ORDER BY event_time DESC LIMIT 1
);
SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname;
-- that is actually a bad behaviour, but it used to work like that for a long time.
-- now is happens only for async_socket_for_remote=0 (while it is 1 by default)
SELECT DISTINCT
'val' AS fieldType,
val AS value
FROM test_lot_of_parts_distributed
WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64')
ORDER BY c DESC
LIMIT 5
SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_remote=0 FORMAT Null;
SYSTEM FLUSH LOGS;
WITH
maxIntersections(
toUnixTimestamp64Micro(query_start_time_microseconds),
toUnixTimestamp64Micro(event_time_microseconds)
) as peak_threads
SELECT
if(peak_threads >= 95, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result
-- we have 95 parts
FROM system.query_thread_log
WHERE
event_time > now() - 60
AND query_id = (
SELECT query_id
FROM system.query_log
WHERE
type = 'QueryFinish'
AND event_time > now() - 60
AND log_comment = 'check_concurrency_in_remote_queries5'
AND current_database = currentDatabase()
ORDER BY event_time DESC LIMIT 1
);
DROP TABLE IF EXISTS test_lot_of_parts_distributed;
DROP TABLE IF EXISTS test_lot_of_parts;