From 44b621fb488f3b77b1e0eb4016ed21db2c8b5f08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Fri, 19 Jan 2024 12:44:55 +0000 Subject: [PATCH 1/2] Revert "Merge pull request #58909 from ClickHouse/fix-test-custom-key-failover" This reverts commit deeb09ff37a95974aac3a4ea554fb12376f92874, reversing changes made to a4cab68a06e34a2d2c1e8465be3b30d6d7d7e0bf. --- .../test.py | 18 ++- .../__init__.py | 0 .../configs/remote_servers.xml | 26 ---- .../test.py | 116 ------------------ 4 files changed, 16 insertions(+), 144 deletions(-) delete mode 100644 tests/integration/test_parallel_replicas_custom_key_load_balancing/__init__.py delete mode 100644 tests/integration/test_parallel_replicas_custom_key_load_balancing/configs/remote_servers.xml delete mode 100644 tests/integration/test_parallel_replicas_custom_key_load_balancing/test.py diff --git a/tests/integration/test_parallel_replicas_custom_key_failover/test.py b/tests/integration/test_parallel_replicas_custom_key_failover/test.py index e1cbef236cf..2b5aa2682d5 100644 --- a/tests/integration/test_parallel_replicas_custom_key_failover/test.py +++ b/tests/integration/test_parallel_replicas_custom_key_failover/test.py @@ -34,6 +34,20 @@ def create_tables(cluster, table_name): f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)" ) + # create distributed table + node1.query(f"DROP TABLE IF EXISTS {table_name}_d SYNC") + node1.query( + f""" + CREATE TABLE {table_name}_d AS {table_name} + Engine=Distributed( + {cluster}, + currentDatabase(), + {table_name}, + key + ) + """ + ) + # populate data node1.query( f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)" @@ -73,7 +87,7 @@ def test_parallel_replicas_custom_key_failover( log_comment = uuid.uuid4() assert ( node1.query( - f"SELECT key, count() FROM cluster('{cluster}', currentDatabase(), test_table) GROUP BY key ORDER BY key", + f"SELECT key, count() FROM {table}_d GROUP BY key ORDER BY key", settings={ "log_comment": log_comment, "prefer_localhost_replica": prefer_localhost_replica, @@ -108,7 +122,7 @@ def test_parallel_replicas_custom_key_failover( assert ( node1.query( - f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1" + f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h SETTINGS skip_unavailable_shards=1" ) == "n1\t3\nn3\t2\n" ) diff --git a/tests/integration/test_parallel_replicas_custom_key_load_balancing/__init__.py b/tests/integration/test_parallel_replicas_custom_key_load_balancing/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_parallel_replicas_custom_key_load_balancing/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_custom_key_load_balancing/configs/remote_servers.xml deleted file mode 100644 index 8b050571c3f..00000000000 --- a/tests/integration/test_parallel_replicas_custom_key_load_balancing/configs/remote_servers.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - false - - n1 - 9000 - - - n2 - 9000 - - - n3 - 9000 - - - n4 - 9000 - - - - - - diff --git a/tests/integration/test_parallel_replicas_custom_key_load_balancing/test.py b/tests/integration/test_parallel_replicas_custom_key_load_balancing/test.py deleted file mode 100644 index 6ee7ebeeb8d..00000000000 --- a/tests/integration/test_parallel_replicas_custom_key_load_balancing/test.py +++ /dev/null @@ -1,116 +0,0 @@ -import pytest -import uuid -from helpers.cluster import ClickHouseCluster - -cluster = ClickHouseCluster(__file__) - -node1 = cluster.add_instance( - "n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) -node2 = cluster.add_instance( - "n2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) -node3 = cluster.add_instance( - "n3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) -node4 = cluster.add_instance( - "n4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) - -nodes = [node1, node2, node3, node4] - - -@pytest.fixture(scope="module", autouse=True) -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - -def create_tables(table_name): - for i in range(0, 4): - nodes[i].query(f"DROP TABLE IF EXISTS {table_name} SYNC") - nodes[i].query( - f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r{i+1}') ORDER BY (key)" - ) - - # populate data - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)" - ) - node2.query(f"SYSTEM SYNC REPLICA {table_name}") - node3.query(f"SYSTEM SYNC REPLICA {table_name}") - node4.query(f"SYSTEM SYNC REPLICA {table_name}") - - -@pytest.mark.parametrize("use_hedged_requests", [1, 0]) -@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"]) -@pytest.mark.parametrize("filter_type", ["default", "range"]) -def test_parallel_replicas_custom_key_load_balancing( - start_cluster, - use_hedged_requests, - custom_key, - filter_type, -): - cluster = "test_single_shard_multiple_replicas" - table = "test_table" - - create_tables(table) - - expected_result = "" - for i in range(4): - expected_result += f"{i}\t1000\n" - - log_comment = uuid.uuid4() - assert ( - node1.query( - f"SELECT key, count() FROM cluster('{cluster}', currentDatabase(), test_table) GROUP BY key ORDER BY key", - settings={ - "log_comment": log_comment, - "prefer_localhost_replica": 0, - "max_parallel_replicas": 4, - "parallel_replicas_custom_key": custom_key, - "parallel_replicas_custom_key_filter_type": filter_type, - "use_hedged_requests": use_hedged_requests, - # "async_socket_for_remote": 0, - # "async_query_sending_for_remote": 0, - }, - ) - == expected_result - ) - - for node in nodes: - node.query("system flush logs") - - # the subqueries should be spread over available nodes - query_id = node1.query( - f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id" - ) - assert query_id != "" - query_id = query_id[:-1] - - assert ( - node1.query( - f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1" - ) - == "subqueries\t4\n" - ) - - # check queries per node - assert ( - node1.query( - f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1" - ) - == "n1\t2\nn2\t1\nn3\t1\nn4\t1\n" - ) From 477489a05d3a307095f3a810380f75b085161122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Fri, 19 Jan 2024 12:46:05 +0000 Subject: [PATCH 2/2] Revert "Merge pull request #57235 from ClickHouse/pr-custom-key-failover" This reverts commit 8ba9b4a7ef90b888f734a23c9868379e59c77978, reversing changes made to 0f39245cb8485f4f034ac5af39e6e641758bd5ec. --- src/Client/ConnectionPoolWithFailover.cpp | 60 ++++---- src/Client/ConnectionPoolWithFailover.h | 39 +++--- src/Client/HedgedConnections.cpp | 20 ++- src/Client/HedgedConnections.h | 16 +-- src/Client/HedgedConnectionsFactory.cpp | 8 +- src/Client/HedgedConnectionsFactory.h | 3 +- src/Common/GetPriorityForLoadBalancing.cpp | 32 ++--- src/Common/GetPriorityForLoadBalancing.h | 9 +- src/Common/PoolWithFailoverBase.h | 6 +- .../ClusterProxy/SelectStreamFactory.cpp | 5 +- .../ClusterProxy/SelectStreamFactory.h | 5 +- .../ClusterProxy/executeQuery.cpp | 25 ++-- src/Interpreters/ClusterProxy/executeQuery.h | 2 +- src/Interpreters/Context.cpp | 6 - src/Interpreters/Context.h | 1 - src/Interpreters/InterpreterSelectQuery.cpp | 3 +- .../getCustomKeyFilterForParallelReplicas.cpp | 8 +- .../getCustomKeyFilterForParallelReplicas.h | 3 + src/Planner/PlannerJoinTree.cpp | 3 +- src/Processors/QueryPlan/ReadFromRemote.cpp | 91 +++---------- src/Processors/QueryPlan/ReadFromRemote.h | 1 - src/QueryPipeline/RemoteQueryExecutor.cpp | 46 ++----- src/QueryPipeline/RemoteQueryExecutor.h | 26 +--- src/Storages/SelectQueryInfo.h | 2 + src/Storages/StorageDistributed.cpp | 35 +++-- tests/config/config.d/clusters.xml | 18 --- .../test_parallel_replicas_custom_key/test.py | 5 + .../__init__.py | 0 .../configs/remote_servers.xml | 26 ---- .../test.py | 128 ------------------ .../01361_fover_remote_num_tries.reference | 2 +- .../01361_fover_remote_num_tries.sh | 2 +- ...s_custom_key_unavailable_replica.reference | 29 ---- ...eplicas_custom_key_unavailable_replica.sql | 30 ---- 34 files changed, 187 insertions(+), 508 deletions(-) delete mode 100644 tests/integration/test_parallel_replicas_custom_key_failover/__init__.py delete mode 100644 tests/integration/test_parallel_replicas_custom_key_failover/configs/remote_servers.xml delete mode 100644 tests/integration/test_parallel_replicas_custom_key_failover/test.py delete mode 100644 tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.reference delete mode 100644 tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.sql diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 43166659b18..4406114a955 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -118,18 +118,18 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const return result; } -std::vector ConnectionPoolWithFailover::getMany( - const ConnectionTimeouts & timeouts, - const Settings & settings, - PoolMode pool_mode, - AsyncCallback async_callback, - std::optional skip_unavailable_endpoints, - GetPriorityForLoadBalancing::Func priority_func) +std::vector ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts, + const Settings & settings, + PoolMode pool_mode, + AsyncCallback async_callback, + std::optional skip_unavailable_endpoints) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) - { return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); }; + { + return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); + }; - std::vector results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func); + std::vector results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints); std::vector entries; entries.reserve(results.size()); @@ -153,17 +153,17 @@ std::vector ConnectionPoolWithFailover::g std::vector ConnectionPoolWithFailover::getManyChecked( const ConnectionTimeouts & timeouts, - const Settings & settings, - PoolMode pool_mode, + const Settings & settings, PoolMode pool_mode, const QualifiedTableName & table_to_check, AsyncCallback async_callback, - std::optional skip_unavailable_endpoints, - GetPriorityForLoadBalancing::Func priority_func) + std::optional skip_unavailable_endpoints) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) - { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); }; + { + return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); + }; - return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func); + return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints); } ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings) @@ -175,16 +175,14 @@ ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::ma } std::vector ConnectionPoolWithFailover::getManyImpl( - const Settings & settings, - PoolMode pool_mode, - const TryGetEntryFunc & try_get_entry, - std::optional skip_unavailable_endpoints, - GetPriorityForLoadBalancing::Func priority_func) + const Settings & settings, + PoolMode pool_mode, + const TryGetEntryFunc & try_get_entry, + std::optional skip_unavailable_endpoints) { if (nested_pools.empty()) - throw DB::Exception( - DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, - "Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty"); + throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, + "Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty"); if (!skip_unavailable_endpoints.has_value()) skip_unavailable_endpoints = settings.skip_unavailable_shards; @@ -205,13 +203,14 @@ std::vector ConnectionPoolWithFailover::g else throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode"); - if (!priority_func) - priority_func = makeGetPriorityFunc(settings); + GetPriorityFunc get_priority = makeGetPriorityFunc(settings); UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value; bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value; - return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func); + return Base::getMany(min_entries, max_entries, max_tries, + max_ignored_errors, fallback_to_stale_replicas, + try_get_entry, get_priority); } ConnectionPoolWithFailover::TryResult @@ -252,14 +251,11 @@ ConnectionPoolWithFailover::tryGetEntry( return result; } -std::vector -ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func) +std::vector ConnectionPoolWithFailover::getShuffledPools(const Settings & settings) { - if (!priority_func) - priority_func = makeGetPriorityFunc(settings); - + GetPriorityFunc get_priority = makeGetPriorityFunc(settings); UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value; - return Base::getShuffledPools(max_ignored_errors, priority_func); + return Base::getShuffledPools(max_ignored_errors, get_priority); } } diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index eaef717a2d6..208a003edb8 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -54,13 +54,10 @@ public: /** Allocates up to the specified number of connections to work. * Connections provide access to different replicas of one shard. */ - std::vector getMany( - const ConnectionTimeouts & timeouts, - const Settings & settings, - PoolMode pool_mode, - AsyncCallback async_callback = {}, - std::optional skip_unavailable_endpoints = std::nullopt, - GetPriorityForLoadBalancing::Func priority_func = {}); + std::vector getMany(const ConnectionTimeouts & timeouts, + const Settings & settings, PoolMode pool_mode, + AsyncCallback async_callback = {}, + std::optional skip_unavailable_endpoints = std::nullopt); /// The same as getMany(), but return std::vector. std::vector getManyForTableFunction(const ConnectionTimeouts & timeouts, @@ -72,13 +69,12 @@ public: /// The same as getMany(), but check that replication delay for table_to_check is acceptable. /// Delay threshold is taken from settings. std::vector getManyChecked( - const ConnectionTimeouts & timeouts, - const Settings & settings, - PoolMode pool_mode, - const QualifiedTableName & table_to_check, - AsyncCallback async_callback = {}, - std::optional skip_unavailable_endpoints = std::nullopt, - GetPriorityForLoadBalancing::Func priority_func = {}); + const ConnectionTimeouts & timeouts, + const Settings & settings, + PoolMode pool_mode, + const QualifiedTableName & table_to_check, + AsyncCallback async_callback = {}, + std::optional skip_unavailable_endpoints = std::nullopt); struct NestedPoolStatus { @@ -91,7 +87,7 @@ public: using Status = std::vector; Status getStatus() const; - std::vector getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {}); + std::vector getShuffledPools(const Settings & settings); size_t getMaxErrorCup() const { return Base::max_error_cap; } @@ -100,16 +96,13 @@ public: Base::updateSharedErrorCounts(shuffled_pools); } - size_t getPoolSize() const { return Base::getPoolSize(); } - private: /// Get the values of relevant settings and call Base::getMany() std::vector getManyImpl( - const Settings & settings, - PoolMode pool_mode, - const TryGetEntryFunc & try_get_entry, - std::optional skip_unavailable_endpoints = std::nullopt, - GetPriorityForLoadBalancing::Func priority_func = {}); + const Settings & settings, + PoolMode pool_mode, + const TryGetEntryFunc & try_get_entry, + std::optional skip_unavailable_endpoints = std::nullopt); /// Try to get a connection from the pool and check that it is good. /// If table_to_check is not null and the check is enabled in settings, check that replication delay @@ -122,7 +115,7 @@ private: const QualifiedTableName * table_to_check = nullptr, AsyncCallback async_callback = {}); - GetPriorityForLoadBalancing::Func makeGetPriorityFunc(const Settings & settings); + GetPriorityFunc makeGetPriorityFunc(const Settings & settings); GetPriorityForLoadBalancing get_priority_load_balancing; }; diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 7ea13a7dffc..0c69d7712ea 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -28,18 +28,16 @@ HedgedConnections::HedgedConnections( const ThrottlerPtr & throttler_, PoolMode pool_mode, std::shared_ptr table_to_check_, - AsyncCallback async_callback, - GetPriorityForLoadBalancing::Func priority_func) + AsyncCallback async_callback) : hedged_connections_factory( - pool_, - context_->getSettingsRef(), - timeouts_, - context_->getSettingsRef().connections_with_failover_max_tries.value, - context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value, - context_->getSettingsRef().max_parallel_replicas.value, - context_->getSettingsRef().skip_unavailable_shards.value, - table_to_check_, - priority_func) + pool_, + context_->getSettingsRef(), + timeouts_, + context_->getSettingsRef().connections_with_failover_max_tries.value, + context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value, + context_->getSettingsRef().max_parallel_replicas.value, + context_->getSettingsRef().skip_unavailable_shards.value, + table_to_check_) , context(std::move(context_)) , settings(context->getSettingsRef()) , throttler(throttler_) diff --git a/src/Client/HedgedConnections.h b/src/Client/HedgedConnections.h index 5bc274332db..ccdc59965e2 100644 --- a/src/Client/HedgedConnections.h +++ b/src/Client/HedgedConnections.h @@ -70,15 +70,13 @@ public: size_t index; }; - HedgedConnections( - const ConnectionPoolWithFailoverPtr & pool_, - ContextPtr context_, - const ConnectionTimeouts & timeouts_, - const ThrottlerPtr & throttler, - PoolMode pool_mode, - std::shared_ptr table_to_check_ = nullptr, - AsyncCallback async_callback = {}, - GetPriorityForLoadBalancing::Func priority_func = {}); + HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_, + ContextPtr context_, + const ConnectionTimeouts & timeouts_, + const ThrottlerPtr & throttler, + PoolMode pool_mode, + std::shared_ptr table_to_check_ = nullptr, + AsyncCallback async_callback = {}); void sendScalarsData(Scalars & data) override; diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index f7b5ceedc96..6ac504772e2 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -29,8 +29,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory( bool fallback_to_stale_replicas_, UInt64 max_parallel_replicas_, bool skip_unavailable_shards_, - std::shared_ptr table_to_check_, - GetPriorityForLoadBalancing::Func priority_func) + std::shared_ptr table_to_check_) : pool(pool_) , timeouts(timeouts_) , table_to_check(table_to_check_) @@ -40,7 +39,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory( , max_parallel_replicas(max_parallel_replicas_) , skip_unavailable_shards(skip_unavailable_shards_) { - shuffled_pools = pool->getShuffledPools(settings_, priority_func); + shuffled_pools = pool->getShuffledPools(settings_); for (auto shuffled_pool : shuffled_pools) replicas.emplace_back(std::make_unique(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get())); } @@ -324,7 +323,8 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect else { ShuffledPool & shuffled_pool = shuffled_pools[index]; - LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message); + LOG_WARNING( + log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message); ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry); shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1); diff --git a/src/Client/HedgedConnectionsFactory.h b/src/Client/HedgedConnectionsFactory.h index f187e9b2abb..e41ac9767a5 100644 --- a/src/Client/HedgedConnectionsFactory.h +++ b/src/Client/HedgedConnectionsFactory.h @@ -53,8 +53,7 @@ public: bool fallback_to_stale_replicas_, UInt64 max_parallel_replicas_, bool skip_unavailable_shards_, - std::shared_ptr table_to_check_ = nullptr, - GetPriorityForLoadBalancing::Func priority_func = {}); + std::shared_ptr table_to_check_ = nullptr); /// Create and return active connections according to pool_mode. std::vector getManyConnections(PoolMode pool_mode, AsyncCallback async_callback = {}); diff --git a/src/Common/GetPriorityForLoadBalancing.cpp b/src/Common/GetPriorityForLoadBalancing.cpp index d4c6f89ff92..bc00e047a88 100644 --- a/src/Common/GetPriorityForLoadBalancing.cpp +++ b/src/Common/GetPriorityForLoadBalancing.cpp @@ -9,8 +9,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -GetPriorityForLoadBalancing::Func -GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const +std::function GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const { std::function get_priority; switch (load_balance) @@ -34,26 +33,19 @@ GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t get_priority = [offset](size_t i) { return i != offset ? Priority{1} : Priority{0}; }; break; case LoadBalancing::ROUND_ROBIN: - auto local_last_used = last_used % pool_size; + if (last_used >= pool_size) + last_used = 0; ++last_used; - - // Example: pool_size = 5 - // | local_last_used | i=0 | i=1 | i=2 | i=3 | i=4 | - // | 0 | 4 | 0 | 1 | 2 | 3 | - // | 1 | 3 | 4 | 0 | 1 | 2 | - // | 2 | 2 | 3 | 4 | 0 | 1 | - // | 3 | 1 | 2 | 3 | 4 | 0 | - // | 4 | 0 | 1 | 2 | 3 | 4 | - - get_priority = [pool_size, local_last_used](size_t i) + /* Consider pool_size equals to 5 + * last_used = 1 -> get_priority: 0 1 2 3 4 + * last_used = 2 -> get_priority: 4 0 1 2 3 + * last_used = 3 -> get_priority: 4 3 0 1 2 + * ... + * */ + get_priority = [this, pool_size](size_t i) { - size_t priority = pool_size - 1; - if (i < local_last_used) - priority = pool_size - 1 - (local_last_used - i); - if (i > local_last_used) - priority = i - local_last_used - 1; - - return Priority{static_cast(priority)}; + ++i; // To make `i` indexing start with 1 instead of 0 as `last_used` does + return Priority{static_cast(i < last_used ? pool_size - i : i - last_used)}; }; break; } diff --git a/src/Common/GetPriorityForLoadBalancing.h b/src/Common/GetPriorityForLoadBalancing.h index 0de99730977..c60d180eca0 100644 --- a/src/Common/GetPriorityForLoadBalancing.h +++ b/src/Common/GetPriorityForLoadBalancing.h @@ -8,12 +8,7 @@ namespace DB class GetPriorityForLoadBalancing { public: - using Func = std::function; - - explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_, size_t last_used_ = 0) - : load_balancing(load_balancing_), last_used(last_used_) - { - } + explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_) : load_balancing(load_balancing_) {} GetPriorityForLoadBalancing() = default; bool operator == (const GetPriorityForLoadBalancing & other) const @@ -28,7 +23,7 @@ public: return !(*this == other); } - Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const; + std::function getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const; std::vector hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools. std::vector hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools. diff --git a/src/Common/PoolWithFailoverBase.h b/src/Common/PoolWithFailoverBase.h index f960d551996..543a39fbc39 100644 --- a/src/Common/PoolWithFailoverBase.h +++ b/src/Common/PoolWithFailoverBase.h @@ -124,9 +124,7 @@ public: size_t max_ignored_errors, bool fallback_to_stale_replicas, const TryGetEntryFunc & try_get_entry, - const GetPriorityFunc & get_priority); - - size_t getPoolSize() const { return nested_pools.size(); } + const GetPriorityFunc & get_priority = GetPriorityFunc()); protected: @@ -149,7 +147,7 @@ protected: return std::make_tuple(shared_pool_states, nested_pools, last_error_decrease_time); } - const NestedPools nested_pools; + NestedPools nested_pools; const time_t decrease_error_period; const size_t max_error_cap; diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index f8a070a6fde..4edc9d4d4e5 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -117,13 +117,13 @@ void SelectStreamFactory::createForShard( std::vector & local_plans, Shards & remote_shards, UInt32 shard_count, - bool parallel_replicas_enabled, - AdditionalShardFilterGenerator shard_filter_generator) + bool parallel_replicas_enabled) { auto it = objects_by_shard.find(shard_info.shard_num); if (it != objects_by_shard.end()) replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast); + auto emplace_local_stream = [&]() { local_plans.emplace_back(createLocalPlan( @@ -139,7 +139,6 @@ void SelectStreamFactory::createForShard( .shard_info = shard_info, .lazy = lazy, .local_delay = local_delay, - .shard_filter_generator = std::move(shard_filter_generator), }); }; diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 9993ea7028d..511b0dfaadb 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -40,7 +40,6 @@ ASTPtr rewriteSelectQuery( ASTPtr table_function_ptr = nullptr); using ColumnsDescriptionByShardNum = std::unordered_map; -using AdditionalShardFilterGenerator = std::function; class SelectStreamFactory { @@ -60,7 +59,6 @@ public: /// (When there is a local replica with big delay). bool lazy = false; time_t local_delay = 0; - AdditionalShardFilterGenerator shard_filter_generator{}; }; using Shards = std::vector; @@ -80,8 +78,7 @@ public: std::vector & local_plans, Shards & remote_shards, UInt32 shard_count, - bool parallel_replicas_enabled, - AdditionalShardFilterGenerator shard_filter_generator); + bool parallel_replicas_enabled); const Block header; const ColumnsDescriptionByShardNum objects_by_shard; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 5865e669e47..99453f224ff 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -158,13 +158,6 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, new_settings.timeout_overflow_mode = settings.timeout_overflow_mode_leaf; } - /// in case of parallel replicas custom key use round robing load balancing - /// so custom key partitions will be spread over nodes in round-robin fashion - if (context->canUseParallelReplicasCustomKey(cluster) && !settings.load_balancing.changed) - { - new_settings.load_balancing = LoadBalancing::ROUND_ROBIN; - } - auto new_context = Context::createCopy(context); new_context->setSettings(new_settings); return new_context; @@ -254,6 +247,21 @@ void executeQuery( visitor.visit(query_ast_for_shard); } + if (shard_filter_generator) + { + auto shard_filter = shard_filter_generator(shard_info.shard_num); + if (shard_filter) + { + auto & select_query = query_ast_for_shard->as(); + + auto where_expression = select_query.where(); + if (where_expression) + shard_filter = makeASTFunction("and", where_expression, shard_filter); + + select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter)); + } + } + // decide for each shard if parallel reading from replicas should be enabled // according to settings and number of replicas declared per shard const auto & addresses = cluster->getShardsAddresses().at(i); @@ -268,8 +276,7 @@ void executeQuery( plans, remote_shards, static_cast(shards), - parallel_replicas_enabled, - shard_filter_generator); + parallel_replicas_enabled); } if (!remote_shards.empty()) diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index a19ece0bbdc..dd48b6e10ad 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -65,7 +65,7 @@ void executeQuery( const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster, const DistributedSettings & distributed_settings, - AdditionalShardFilterGenerator shard_filter_generator); + AdditionalShardFilterGenerator shard_filter_generator = {}); void executeQueryWithParallelReplicas( diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a6e9a6347a7..13a7618e461 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5113,12 +5113,6 @@ bool Context::canUseParallelReplicasOnFollower() const return canUseTaskBasedParallelReplicas() && getClientInfo().collaborate_with_initiator; } -bool Context::canUseParallelReplicasCustomKey(const Cluster & cluster) const -{ - return settings.max_parallel_replicas > 1 && getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY - && cluster.getShardCount() == 1 && cluster.getShardsInfo()[0].getAllNodeCount() > 1; -} - void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache) { prepared_sets_cache = cache; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 75c09cfe873..e91db7faa7b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1246,7 +1246,6 @@ public: bool canUseTaskBasedParallelReplicas() const; bool canUseParallelReplicasOnInitiator() const; bool canUseParallelReplicasOnFollower() const; - bool canUseParallelReplicasCustomKey(const Cluster & cluster) const; enum class ParallelReplicasMode : uint8_t { diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index c0e9aeaae1d..c5790b08a24 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -589,8 +589,9 @@ InterpreterSelectQuery::InterpreterSelectQuery( } } else if (auto * distributed = dynamic_cast(storage.get()); - distributed && context->canUseParallelReplicasCustomKey(*distributed->getCluster())) + distributed && canUseCustomKey(settings, *distributed->getCluster(), *context)) { + query_info.use_custom_key = true; context->setSetting("distributed_group_by_no_merge", 2); } } diff --git a/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp b/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp index 1295a4d5a75..2e9ee0af724 100644 --- a/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp +++ b/src/Interpreters/getCustomKeyFilterForParallelReplicas.cpp @@ -20,6 +20,12 @@ namespace ErrorCodes extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; } +bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context) +{ + return settings.max_parallel_replicas > 1 && context.getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY + && cluster.getShardCount() == 1 && cluster.getShardsInfo()[0].getAllNodeCount() > 1; +} + ASTPtr getCustomKeyFilterForParallelReplica( size_t replicas_count, size_t replica_num, @@ -28,7 +34,7 @@ ASTPtr getCustomKeyFilterForParallelReplica( const ColumnsDescription & columns, const ContextPtr & context) { - chassert(replicas_count > 1); + assert(replicas_count > 1); if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT) { // first we do modulo with replica count diff --git a/src/Interpreters/getCustomKeyFilterForParallelReplicas.h b/src/Interpreters/getCustomKeyFilterForParallelReplicas.h index 1506c1992c0..c35f00f3dfd 100644 --- a/src/Interpreters/getCustomKeyFilterForParallelReplicas.h +++ b/src/Interpreters/getCustomKeyFilterForParallelReplicas.h @@ -9,6 +9,9 @@ namespace DB { + +bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context); + /// Get AST for filter created from custom_key /// replica_num is the number of the replica for which we are generating filter starting from 0 ASTPtr getCustomKeyFilterForParallelReplica( diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 552f25d7035..394cd8a0669 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -809,8 +809,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres else { if (auto * distributed = typeid_cast(storage.get()); - distributed && query_context->canUseParallelReplicasCustomKey(*distributed->getCluster())) + distributed && canUseCustomKey(settings, *distributed->getCluster(), *query_context)) { + table_expression_query_info.use_custom_key = true; planner_context->getMutableQueryContext()->setSetting("distributed_group_by_no_merge", 2); } } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 4de5ee09ffb..496ec083cac 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include @@ -232,6 +231,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact add_extremes = context->getSettingsRef().extremes; } + String query_string = formattedAST(shard.query); + scalars["_shard_num"] = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; @@ -253,81 +254,29 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact context->setSetting("cluster_for_parallel_replicas", cluster_name); } - /// parallel replicas custom key case - if (shard.shard_filter_generator) + auto remote_query_executor = std::make_shared( + shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage); + remote_query_executor->setLogger(log); + + if (context->canUseTaskBasedParallelReplicas()) { - for (size_t i = 0; i < shard.shard_info.per_replica_pools.size(); ++i) - { - auto query = shard.query->clone(); - auto & select_query = query->as(); - auto shard_filter = shard.shard_filter_generator(i + 1); - if (shard_filter) - { - auto where_expression = select_query.where(); - if (where_expression) - shard_filter = makeASTFunction("and", where_expression, shard_filter); - - select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter)); - } - - const String query_string = formattedAST(query); - - if (!priority_func_factory.has_value()) - priority_func_factory = GetPriorityForLoadBalancing(LoadBalancing::ROUND_ROBIN, randomSeed()); - - GetPriorityForLoadBalancing::Func priority_func - = priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize()); - - auto remote_query_executor = std::make_shared( - shard.shard_info.pool, - query_string, - output_stream->header, - context, - throttler, - scalars, - external_tables, - stage, - std::nullopt, - priority_func); - remote_query_executor->setLogger(log); - remote_query_executor->setPoolMode(PoolMode::GET_ONE); - - if (!table_func_ptr) - remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); - - pipes.emplace_back( - createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); - addConvertingActions(pipes.back(), output_stream->header); - } + // when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard: + // establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard. + // The coordinator will return query result from the shard. + // Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard. + // Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting) + // each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators + remote_query_executor->setPoolMode(PoolMode::GET_ONE); } else - { - const String query_string = formattedAST(shard.query); + remote_query_executor->setPoolMode(PoolMode::GET_MANY); - auto remote_query_executor = std::make_shared( - shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage); - remote_query_executor->setLogger(log); + if (!table_func_ptr) + remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); - if (context->canUseTaskBasedParallelReplicas()) - { - // when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard: - // establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard. - // The coordinator will return query result from the shard. - // Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard. - // Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting) - // each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators - remote_query_executor->setPoolMode(PoolMode::GET_ONE); - } - else - remote_query_executor->setPoolMode(PoolMode::GET_MANY); - - if (!table_func_ptr) - remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); - - pipes.emplace_back( - createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); - addConvertingActions(pipes.back(), output_stream->header); - } + pipes.emplace_back( + createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); + addConvertingActions(pipes.back(), output_stream->header); } void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 82ef45d6bbf..cbdc3e2f542 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -60,7 +60,6 @@ private: Poco::Logger * log; UInt32 shard_count; const String cluster_name; - std::optional priority_func_factory; void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index a43571c8114..0a8df5d1d34 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -43,24 +43,13 @@ namespace ErrorCodes } RemoteQueryExecutor::RemoteQueryExecutor( - const String & query_, - const Block & header_, - ContextPtr context_, - const Scalars & scalars_, - const Tables & external_tables_, - QueryProcessingStage::Enum stage_, - std::optional extension_, - GetPriorityForLoadBalancing::Func priority_func_) - : header(header_) - , query(query_) - , context(context_) - , scalars(scalars_) - , external_tables(external_tables_) - , stage(stage_) + const String & query_, const Block & header_, ContextPtr context_, + const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional extension_) + : header(header_), query(query_), context(context_), scalars(scalars_) + , external_tables(external_tables_), stage(stage_) , extension(extension_) - , priority_func(priority_func_) -{ -} +{} RemoteQueryExecutor::RemoteQueryExecutor( Connection & connection, @@ -111,16 +100,10 @@ RemoteQueryExecutor::RemoteQueryExecutor( RemoteQueryExecutor::RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, - const Block & header_, - ContextPtr context_, - const ThrottlerPtr & throttler, - const Scalars & scalars_, - const Tables & external_tables_, - QueryProcessingStage::Enum stage_, - std::optional extension_, - GetPriorityForLoadBalancing::Func priority_func_) - : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_, priority_func_) + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional extension_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) { create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr { @@ -134,8 +117,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( if (main_table) table_to_check = std::make_shared(main_table.getQualifiedName()); - auto res = std::make_unique( - pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func); + auto res = std::make_unique(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback)); if (extension && extension->replica_info) res->setReplicaInfo(*extension->replica_info); return res; @@ -155,16 +137,14 @@ RemoteQueryExecutor::RemoteQueryExecutor( pool_mode, main_table.getQualifiedName(), std::move(async_callback), - skip_unavailable_endpoints, - priority_func); + skip_unavailable_endpoints); connection_entries.reserve(try_results.size()); for (auto & try_result : try_results) connection_entries.emplace_back(std::move(try_result.entry)); } else { - connection_entries = pool->getMany( - timeouts, current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints, priority_func); + connection_entries = pool->getMany(timeouts, current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints); } auto res = std::make_unique(std::move(connection_entries), current_settings, throttler); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 5a8ccc2592b..49ea40bf4b6 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -50,7 +50,6 @@ public: std::shared_ptr task_iterator = nullptr; std::shared_ptr parallel_reading_coordinator = nullptr; std::optional replica_info = {}; - GetPriorityForLoadBalancing::Func priority_func; }; /// Takes already set connection. @@ -77,15 +76,9 @@ public: /// Takes a pool and gets one or several connections from it. RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, - const Block & header_, - ContextPtr context_, - const ThrottlerPtr & throttler = nullptr, - const Scalars & scalars_ = Scalars(), - const Tables & external_tables_ = Tables(), - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, - std::optional extension_ = std::nullopt, - GetPriorityForLoadBalancing::Func priority_func = {}); + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional extension_ = std::nullopt); ~RemoteQueryExecutor(); @@ -198,14 +191,9 @@ public: private: RemoteQueryExecutor( - const String & query_, - const Block & header_, - ContextPtr context_, - const Scalars & scalars_, - const Tables & external_tables_, - QueryProcessingStage::Enum stage_, - std::optional extension_, - GetPriorityForLoadBalancing::Func priority_func = {}); + const String & query_, const Block & header_, ContextPtr context_, + const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional extension_); Block header; Block totals; @@ -285,8 +273,6 @@ private: Poco::Logger * log = nullptr; - GetPriorityForLoadBalancing::Func priority_func; - /// Send all scalars to remote servers void sendScalars(); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 662a5c0ef5a..c3be07b6572 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -176,6 +176,8 @@ struct SelectQueryInfo /// /// Configured in StorageDistributed::getQueryProcessingStage() ClusterPtr optimized_cluster; + /// should we use custom key with the cluster + bool use_custom_key = false; TreeRewriterResultPtr syntax_analyzer_result; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index a829002187b..7ef2ff08827 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -429,10 +429,15 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( size_t nodes = getClusterQueriedNodes(settings, cluster); - query_info.cluster = cluster; - - if (!local_context->canUseParallelReplicasCustomKey(*cluster)) + if (query_info.use_custom_key) { + LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into virtual shards"); + query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas); + } + else + { + query_info.cluster = cluster; + if (nodes > 1 && settings.optimize_skip_unused_shards) { /// Always calculate optimized cluster here, to avoid conditions during read() @@ -875,22 +880,30 @@ void StorageDistributed::read( storage_snapshot, processed_stage); - const auto & settings = local_context->getSettingsRef(); + auto settings = local_context->getSettingsRef(); ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator; - if (local_context->canUseParallelReplicasCustomKey(*query_info.getCluster())) + if (query_info.use_custom_key) { if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context)) { + if (query_info.getCluster()->getShardCount() == 1) + { + // we are reading from single shard with multiple replicas but didn't transform replicas + // into virtual shards with custom_key set + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards"); + } + additional_shard_filter_generator = - [my_custom_key_ast = std::move(custom_key_ast), - column_description = this->getInMemoryMetadataPtr()->columns, - custom_key_type = settings.parallel_replicas_custom_key_filter_type.value, - context = local_context, - replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr + [&, my_custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr { return getCustomKeyFilterForParallelReplica( - replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context); + shard_count, + shard_num - 1, + my_custom_key_ast, + settings.parallel_replicas_custom_key_filter_type, + this->getInMemoryMetadataPtr()->columns, + local_context); }; } } diff --git a/tests/config/config.d/clusters.xml b/tests/config/config.d/clusters.xml index 7ade716902c..cfd4868f1dc 100644 --- a/tests/config/config.d/clusters.xml +++ b/tests/config/config.d/clusters.xml @@ -144,24 +144,6 @@ - - - false - - 127.0.0.1 - 9000 - - - 127.0.0.2 - 9000 - - - - 127.0.0.3 - 1234 - - - false diff --git a/tests/integration/test_parallel_replicas_custom_key/test.py b/tests/integration/test_parallel_replicas_custom_key/test.py index c646a678512..baac2661506 100644 --- a/tests/integration/test_parallel_replicas_custom_key/test.py +++ b/tests/integration/test_parallel_replicas_custom_key/test.py @@ -87,3 +87,8 @@ def test_parallel_replicas_custom_key(start_cluster, cluster, custom_key, filter node.contains_in_log("Processing query on a replica using custom_key") for node in nodes ) + else: + # we first transform all replicas into shards and then append for each shard filter + assert n1.contains_in_log( + "Single shard cluster used with custom_key, transforming replicas into virtual shards" + ) diff --git a/tests/integration/test_parallel_replicas_custom_key_failover/__init__.py b/tests/integration/test_parallel_replicas_custom_key_failover/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_parallel_replicas_custom_key_failover/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_custom_key_failover/configs/remote_servers.xml deleted file mode 100644 index da4e2517a44..00000000000 --- a/tests/integration/test_parallel_replicas_custom_key_failover/configs/remote_servers.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - false - - n1 - 9000 - - - n2 - 1234 - - - n3 - 9000 - - - n4 - 1234 - - - - - - diff --git a/tests/integration/test_parallel_replicas_custom_key_failover/test.py b/tests/integration/test_parallel_replicas_custom_key_failover/test.py deleted file mode 100644 index 2b5aa2682d5..00000000000 --- a/tests/integration/test_parallel_replicas_custom_key_failover/test.py +++ /dev/null @@ -1,128 +0,0 @@ -import pytest -import uuid -from helpers.cluster import ClickHouseCluster - -cluster = ClickHouseCluster(__file__) - -node1 = cluster.add_instance( - "n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) -node3 = cluster.add_instance( - "n3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True -) - -nodes = [node1, node3] - - -@pytest.fixture(scope="module", autouse=True) -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - -def create_tables(cluster, table_name): - node1.query(f"DROP TABLE IF EXISTS {table_name} SYNC") - node3.query(f"DROP TABLE IF EXISTS {table_name} SYNC") - - node1.query( - f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)" - ) - node3.query( - f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)" - ) - - # create distributed table - node1.query(f"DROP TABLE IF EXISTS {table_name}_d SYNC") - node1.query( - f""" - CREATE TABLE {table_name}_d AS {table_name} - Engine=Distributed( - {cluster}, - currentDatabase(), - {table_name}, - key - ) - """ - ) - - # populate data - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)" - ) - node1.query( - f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)" - ) - node3.query(f"SYSTEM SYNC REPLICA {table_name}") - - -@pytest.mark.parametrize("use_hedged_requests", [1, 0]) -@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"]) -@pytest.mark.parametrize("filter_type", ["default", "range"]) -@pytest.mark.parametrize("prefer_localhost_replica", [0, 1]) -def test_parallel_replicas_custom_key_failover( - start_cluster, - use_hedged_requests, - custom_key, - filter_type, - prefer_localhost_replica, -): - cluster = "test_single_shard_multiple_replicas" - table = "test_table" - - create_tables(cluster, table) - - expected_result = "" - for i in range(4): - expected_result += f"{i}\t1000\n" - - log_comment = uuid.uuid4() - assert ( - node1.query( - f"SELECT key, count() FROM {table}_d GROUP BY key ORDER BY key", - settings={ - "log_comment": log_comment, - "prefer_localhost_replica": prefer_localhost_replica, - "max_parallel_replicas": 4, - "parallel_replicas_custom_key": custom_key, - "parallel_replicas_custom_key_filter_type": filter_type, - "use_hedged_requests": use_hedged_requests, - # "async_socket_for_remote": 0, - # "async_query_sending_for_remote": 0, - }, - ) - == expected_result - ) - - for node in nodes: - node.query("system flush logs") - - # the subqueries should be spread over available nodes - query_id = node1.query( - f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id" - ) - assert query_id != "" - query_id = query_id[:-1] - - if prefer_localhost_replica == 0: - assert ( - node1.query( - f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1" - ) - == "subqueries\t4\n" - ) - - assert ( - node1.query( - f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h SETTINGS skip_unavailable_shards=1" - ) - == "n1\t3\nn3\t2\n" - ) diff --git a/tests/queries/0_stateless/01361_fover_remote_num_tries.reference b/tests/queries/0_stateless/01361_fover_remote_num_tries.reference index 209e3ef4b62..64bb6b746dc 100644 --- a/tests/queries/0_stateless/01361_fover_remote_num_tries.reference +++ b/tests/queries/0_stateless/01361_fover_remote_num_tries.reference @@ -1 +1 @@ -20 +30 diff --git a/tests/queries/0_stateless/01361_fover_remote_num_tries.sh b/tests/queries/0_stateless/01361_fover_remote_num_tries.sh index 9d9c6b920b6..f07ffc02e4f 100755 --- a/tests/queries/0_stateless/01361_fover_remote_num_tries.sh +++ b/tests/queries/0_stateless/01361_fover_remote_num_tries.sh @@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --connect_timeout_with_failover_ms 1 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable|Timeout: connect timed out' | wc -l +$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable|Timeout: connect timed out' | wc -l diff --git a/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.reference b/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.reference deleted file mode 100644 index 2d97dd0e12e..00000000000 --- a/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.reference +++ /dev/null @@ -1,29 +0,0 @@ --- { echoOn } -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default'; -0 250 -1 250 -2 250 -3 250 -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range'; -0 250 -1 250 -2 250 -3 250 -SET use_hedged_requests=0; -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default'; -0 250 -1 250 -2 250 -3 250 diff --git a/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.sql b/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.sql deleted file mode 100644 index b9bc6974c47..00000000000 --- a/tests/queries/0_stateless/02918_parallel_replicas_custom_key_unavailable_replica.sql +++ /dev/null @@ -1,30 +0,0 @@ -DROP TABLE IF EXISTS 02918_parallel_replicas; - -CREATE TABLE 02918_parallel_replicas (x String, y Int32) ENGINE = MergeTree ORDER BY cityHash64(x); - -INSERT INTO 02918_parallel_replicas SELECT toString(number), number % 4 FROM numbers(1000); - -SET prefer_localhost_replica=0; - --- { echoOn } -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default'; - -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range'; - -SET use_hedged_requests=0; -SELECT y, count() -FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) -GROUP BY y -ORDER BY y -SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default'; --- { echoOff } - -DROP TABLE 02918_parallel_replicas;