diff --git a/src/Client/Connection.h b/src/Client/Connection.h index e93a7539d15..5d0411027a1 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -19,7 +19,6 @@ #include -#include #include #include "config.h" diff --git a/src/Client/ConnectionPool.h b/src/Client/ConnectionPool.h index 1886a0431a5..8e707e8190f 100644 --- a/src/Client/ConnectionPool.h +++ b/src/Client/ConnectionPool.h @@ -27,6 +27,9 @@ class IConnectionPool : private boost::noncopyable public: using Entry = PoolBase::Entry; + IConnectionPool() = default; + IConnectionPool(String host_, UInt16 port_) : host(host_), port(port_), address(host + ":" + toString(port_)) {} + virtual ~IConnectionPool() = default; /// Selects the connection to work. @@ -36,7 +39,15 @@ public: const Settings & settings, bool force_connected = true) = 0; + const std::string & getHost() const { return host; } + UInt16 getPort() const { return port; } + const String & getAddress() const { return address; } virtual Priority getPriority() const { return Priority{1}; } + +protected: + const String host; + const UInt16 port = 0; + const String address; }; using ConnectionPoolPtr = std::shared_ptr; @@ -63,10 +74,9 @@ public: Protocol::Compression compression_, Protocol::Secure secure_, Priority priority_ = Priority{1}) - : Base(max_connections_, + : IConnectionPool(host_, port_), + Base(max_connections_, getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), - host(host_), - port(port_), default_database(default_database_), user(user_), password(password_), @@ -99,10 +109,6 @@ public: return entry; } - const std::string & getHost() const - { - return host; - } std::string getDescription() const { return host + ":" + toString(port); @@ -125,8 +131,6 @@ protected: } private: - String host; - UInt16 port; String default_database; String user; String password; diff --git a/src/Client/IConnections.h b/src/Client/IConnections.h index ee17d198fc3..ebc71511834 100644 --- a/src/Client/IConnections.h +++ b/src/Client/IConnections.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 2e154ddb32d..8e81a626b41 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -593,6 +593,7 @@ M(711, FILECACHE_ACCESS_DENIED) \ M(712, TOO_MANY_MATERIALIZED_VIEWS) \ M(713, BROKEN_PROJECTION) \ + M(714, UNEXPECTED_CLUSTER) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index e6b3449b0c9..2476a69a513 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -629,6 +629,8 @@ The server successfully detected this situation and will download merged part fr M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \ \ M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \ + M(ParallelReplicasAvailableCount, "Number of replicas available to execute a query with task-based parallel replicas") \ + M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but found to be unavailable during query execution with task-based parallel replicas") \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 35451e1d774..33b86854ba9 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -32,6 +32,7 @@ namespace ErrorCodes extern const int TOO_LARGE_DISTRIBUTED_DEPTH; extern const int LOGICAL_ERROR; extern const int CLUSTER_DOESNT_EXIST; + extern const int UNEXPECTED_CLUSTER; } namespace ClusterProxy @@ -374,12 +375,12 @@ void executeQueryWithParallelReplicas( shard_num = column->getUInt(0); } - ClusterPtr new_cluster; + const auto shard_count = not_optimized_cluster->getShardCount(); + ClusterPtr new_cluster = not_optimized_cluster; /// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard /// shards are numbered in order of appearance in the cluster config if (shard_num > 0) { - const auto shard_count = not_optimized_cluster->getShardCount(); if (shard_num > shard_count) throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -395,17 +396,18 @@ void executeQueryWithParallelReplicas( // get cluster for shard specified by shard_num // shard_num is 1-based, but getClusterWithSingleShard expects 0-based index - auto single_shard_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1); - // convert cluster to representation expected by parallel replicas - new_cluster = single_shard_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas); + new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1); } else { - new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas); + if (not_optimized_cluster->getShardCount() > 1) + throw DB::Exception( + ErrorCodes::UNEXPECTED_CLUSTER, + "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); } - auto coordinator - = std::make_shared(new_cluster->getShardCount(), settings.parallel_replicas_mark_segment_size); + auto coordinator = std::make_shared( + new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size); auto external_tables = new_context->getExternalTables(); auto read_from_remote = std::make_unique( query_ast, diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index f92dc06fa7e..93c73a66b78 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -12,7 +12,7 @@ #include #include #include -#include "Common/logger_useful.h" +#include #include #include #include @@ -375,10 +375,11 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( , storage_limits(std::move(storage_limits_)) , log(log_) { - std::vector description; + chassert(cluster->getShardCount() == 1); - for (const auto & address : cluster->getShardsAddresses()) - description.push_back(fmt::format("Replica: {}", address[0].host_name)); + std::vector description; + for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools) + description.push_back(fmt::format("Replica: {}", pool->getHost())); setStepDescription(boost::algorithm::join(description, ", ")); } @@ -399,51 +400,44 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder const Settings & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + const auto & shard = cluster->getShardsInfo().at(0); size_t all_replicas_count = current_settings.max_parallel_replicas; - if (all_replicas_count > cluster->getShardsInfo().size()) + if (all_replicas_count > shard.getAllNodeCount()) { - LOG_INFO(getLogger("ReadFromParallelRemoteReplicasStep"), - "The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). "\ - "Will use the latter number to execute the query.", current_settings.max_parallel_replicas, cluster->getShardsInfo().size()); - all_replicas_count = cluster->getShardsInfo().size(); + LOG_INFO( + getLogger("ReadFromParallelRemoteReplicasStep"), + "The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). " + "Will use the latter number to execute the query.", + current_settings.max_parallel_replicas, + shard.getAllNodeCount()); + all_replicas_count = shard.getAllNodeCount(); } - /// Find local shard. It might happen that there is no local shard, but that's fine - for (const auto & shard: cluster->getShardsInfo()) - { - if (shard.isLocal()) - { - IConnections::ReplicaInfo replica_info - { - .all_replicas_count = all_replicas_count, - /// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`). - /// we should use this number specifically because efficiency of data distribution by consistent hash depends on it. - .number_of_current_replica = shard.shard_num - 1, - }; - addPipeForSingeReplica(pipes, shard.pool, replica_info); - } + std::vector shuffled_pool; + if (all_replicas_count < shard.getAllNodeCount()) + { + shuffled_pool = shard.pool->getShuffledPools(current_settings); + shuffled_pool.resize(all_replicas_count); + } + else + { + /// try to preserve replicas order if all replicas in cluster are used for query execution + /// it's important for data locality during query execution + auto priority_func = [](size_t i) { return Priority{static_cast(i)}; }; + shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); } - auto current_shard = cluster->getShardsInfo().begin(); - while (pipes.size() != all_replicas_count) + for (size_t i=0; i < all_replicas_count; ++i) { - if (current_shard->isLocal()) - { - ++current_shard; - continue; - } - IConnections::ReplicaInfo replica_info { .all_replicas_count = all_replicas_count, - /// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`). /// we should use this number specifically because efficiency of data distribution by consistent hash depends on it. - .number_of_current_replica = current_shard->shard_num - 1, + .number_of_current_replica = i, }; - addPipeForSingeReplica(pipes, current_shard->pool, replica_info); - ++current_shard; + addPipeForSingeReplica(pipes, shuffled_pool[i].pool, replica_info); } auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -456,7 +450,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder } -void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, std::shared_ptr pool, IConnections::ReplicaInfo replica_info) +void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica( + Pipes & pipes, const ConnectionPoolPtr & pool, IConnections::ReplicaInfo replica_info) { bool add_agg_info = stage == QueryProcessingStage::WithMergeableState; bool add_totals = false; @@ -476,7 +471,14 @@ void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, s assert(output_stream); auto remote_query_executor = std::make_shared( - pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage, + pool, + query_string, + output_stream->header, + context, + throttler, + scalars, + external_tables, + stage, RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)}); remote_query_executor->setLogger(log); diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index f853a12910b..498d584e85a 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -9,10 +9,6 @@ namespace DB { - -class ConnectionPoolWithFailover; -using ConnectionPoolWithFailoverPtr = std::shared_ptr; - class Throttler; using ThrottlerPtr = std::shared_ptr; @@ -91,8 +87,7 @@ public: void enforceAggregationInOrder(); private: - - void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr pool, IConnections::ReplicaInfo replica_info); + void addPipeForSingeReplica(Pipes & pipes, const ConnectionPoolPtr & pool, IConnections::ReplicaInfo replica_info); ClusterPtr cluster; ASTPtr query_ast; diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 1caedfc8511..46616905bcb 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -4,7 +4,7 @@ #include #include -#include "Core/Protocol.h" +#include #include #include #include @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ namespace ProfileEvents extern const Event SuspendSendingQueryToShard; extern const Event ReadTaskRequestsReceived; extern const Event MergeTreeReadTaskRequestsReceived; + extern const Event ParallelReplicasAvailableCount; } namespace DB @@ -62,6 +64,55 @@ RemoteQueryExecutor::RemoteQueryExecutor( { } +RemoteQueryExecutor::RemoteQueryExecutor( + ConnectionPoolPtr pool, + const String & query_, + const Block & header_, + ContextPtr context_, + ThrottlerPtr throttler, + const Scalars & scalars_, + const Tables & external_tables_, + QueryProcessingStage::Enum stage_, + std::optional extension_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) +{ + create_connections = [this, pool, throttler, extension_](AsyncCallback) + { + const Settings & current_settings = context->getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + + ConnectionPoolWithFailover::TryResult result; + std::string fail_message; + if (main_table) + { + auto table_name = main_table.getQualifiedName(); + + ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, &table_name); + connection_establisher.run(result, fail_message); + } + else + { + ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, nullptr); + connection_establisher.run(result, fail_message); + } + + std::vector connection_entries; + if (!result.entry.isNull() && result.is_usable) + { + if (extension_ && extension_->parallel_reading_coordinator) + ProfileEvents::increment(ProfileEvents::ParallelReplicasAvailableCount); + + connection_entries.emplace_back(std::move(result.entry)); + } + + auto res = std::make_unique(std::move(connection_entries), current_settings, throttler); + if (extension_ && extension_->replica_info) + res->setReplicaInfo(*extension_->replica_info); + + return res; + }; +} + RemoteQueryExecutor::RemoteQueryExecutor( Connection & connection, const String & query_, diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 3fac2065d02..6b1539bd08e 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -52,6 +52,18 @@ public: std::optional replica_info = {}; }; + /// Takes a connection pool for a node (not cluster) + RemoteQueryExecutor( + ConnectionPoolPtr pool, + const String & query_, + const Block & header_, + ContextPtr context_, + ThrottlerPtr throttler = nullptr, + const Scalars & scalars_ = Scalars(), + const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + std::optional extension_ = std::nullopt); + /// Takes already set connection. RemoteQueryExecutor( Connection & connection, diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index abc51bde3fb..2fe237efdc7 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -97,11 +97,9 @@ extern const Event ParallelReplicasCollectingOwnedSegmentsMicroseconds; extern const Event ParallelReplicasReadAssignedMarks; extern const Event ParallelReplicasReadUnassignedMarks; extern const Event ParallelReplicasReadAssignedForStealingMarks; -} -namespace ProfileEvents -{ - extern const Event ParallelReplicasUsedCount; +extern const Event ParallelReplicasUsedCount; +extern const Event ParallelReplicasUnavailableCount; } namespace DB @@ -1025,6 +1023,8 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number) { + ProfileEvents::increment(ProfileEvents::ParallelReplicasUnavailableCount); + std::lock_guard lock(mutex); if (!pimpl) diff --git a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql index 38d592201e3..1a75e000349 100644 --- a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql +++ b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql @@ -2,14 +2,13 @@ DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards; CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple(); INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10); -SYSTEM FLUSH LOGS; - SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=11, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1; SET send_logs_level='error'; -SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*); +SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79'; SYSTEM FLUSH LOGS; -SELECT count() > 0 FROM system.text_log WHERE yesterday() <= event_date AND message LIKE '%Replica number 10 is unavailable%'; +SET allow_experimental_parallel_reading_from_replicas=0; +SELECT ProfileEvents['ParallelReplicasUnavailableCount'] FROM system.query_log WHERE yesterday() <= event_date AND query_id in (select query_id from system.query_log where log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79' and current_database = currentDatabase()) and type = 'QueryFinish' and query_id == initial_query_id; DROP TABLE test_parallel_replicas_unavailable_shards; diff --git a/tests/queries/0_stateless/02972_parallel_replicas_cte.sql b/tests/queries/0_stateless/02972_parallel_replicas_cte.sql index c39ad172a27..3702184e336 100644 --- a/tests/queries/0_stateless/02972_parallel_replicas_cte.sql +++ b/tests/queries/0_stateless/02972_parallel_replicas_cte.sql @@ -12,16 +12,16 @@ SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups. WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000) SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a -SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; +SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- Testing that it is disabled for allow_experimental_analyzer=0. With analyzer it will be supported (with correct result) WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000) SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a -SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED } +SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED } -- Sanitizer SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number -SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; +SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; DROP TABLE IF EXISTS pr_1; DROP TABLE IF EXISTS pr_2; diff --git a/tests/queries/0_stateless/02982_parallel_replicas_unexpected_cluster.reference b/tests/queries/0_stateless/02982_parallel_replicas_unexpected_cluster.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02982_parallel_replicas_unexpected_cluster.sql b/tests/queries/0_stateless/02982_parallel_replicas_unexpected_cluster.sql new file mode 100644 index 00000000000..210b7d2a18a --- /dev/null +++ b/tests/queries/0_stateless/02982_parallel_replicas_unexpected_cluster.sql @@ -0,0 +1,8 @@ +DROP TABLE IF EXISTS test_unexpected_cluster; +CREATE TABLE test_unexpected_cluster (n UInt64) ENGINE=MergeTree() ORDER BY tuple(); +INSERT INTO test_unexpected_cluster SELECT * FROM numbers(10); + +SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=2, cluster_for_parallel_replicas='test_cluster_two_shards', parallel_replicas_for_non_replicated_merge_tree=1; +SELECT count() FROM test_unexpected_cluster WHERE NOT ignore(*); -- { serverError UNEXPECTED_CLUSTER } + +DROP TABLE test_unexpected_cluster;