diff --git a/src/Client/ConnectionPool.h b/src/Client/ConnectionPool.h index 1886a0431a5..8e707e8190f 100644 --- a/src/Client/ConnectionPool.h +++ b/src/Client/ConnectionPool.h @@ -27,6 +27,9 @@ class IConnectionPool : private boost::noncopyable public: using Entry = PoolBase::Entry; + IConnectionPool() = default; + IConnectionPool(String host_, UInt16 port_) : host(host_), port(port_), address(host + ":" + toString(port_)) {} + virtual ~IConnectionPool() = default; /// Selects the connection to work. @@ -36,7 +39,15 @@ public: const Settings & settings, bool force_connected = true) = 0; + const std::string & getHost() const { return host; } + UInt16 getPort() const { return port; } + const String & getAddress() const { return address; } virtual Priority getPriority() const { return Priority{1}; } + +protected: + const String host; + const UInt16 port = 0; + const String address; }; using ConnectionPoolPtr = std::shared_ptr; @@ -63,10 +74,9 @@ public: Protocol::Compression compression_, Protocol::Secure secure_, Priority priority_ = Priority{1}) - : Base(max_connections_, + : IConnectionPool(host_, port_), + Base(max_connections_, getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), - host(host_), - port(port_), default_database(default_database_), user(user_), password(password_), @@ -99,10 +109,6 @@ public: return entry; } - const std::string & getHost() const - { - return host; - } std::string getDescription() const { return host + ":" + toString(port); @@ -125,8 +131,6 @@ protected: } private: - String host; - UInt16 port; String default_database; String user; String password; diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 0218545c3a4..fc30a4e0794 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -617,7 +617,7 @@ The server successfully detected this situation and will download merged part fr \ M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \ M(ParallelReplicasAvailableCount, "Number of replicas available to execute a query with task-based parallel replicas") \ - M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but unavailable, to execute a query with task-based parallel replicas") \ + M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but found to be unavailable during query execution with task-based parallel replicas") \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 6764e095088..5707eb2e9c6 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -375,10 +375,11 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( , storage_limits(std::move(storage_limits_)) , log(log_) { - std::vector description; + chassert(cluster->getShardCount() == 1); - for (const auto & address : cluster->getShardsAddresses()) - description.push_back(fmt::format("Replica: {}", address[0].host_name)); + std::vector description; + for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools) + description.push_back(fmt::format("Replica: {}", pool->getHost())); setStepDescription(boost::algorithm::join(description, ", ")); } @@ -412,7 +413,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder all_replicas_count = shard.getAllNodeCount(); } - chassert(cluster->getShardCount() == 1); auto shuffled_pool = shard.pool->getShuffledPools(current_settings); shuffled_pool.resize(all_replicas_count); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index e44749dfb97..46616905bcb 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -105,7 +105,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( connection_entries.emplace_back(std::move(result.entry)); } - auto res = std::make_unique(std::move(connection_entries), context->getSettingsRef(), throttler); + auto res = std::make_unique(std::move(connection_entries), current_settings, throttler); if (extension_ && extension_->replica_info) res->setReplicaInfo(*extension_->replica_info);