Minor review fixes

This commit is contained in:
Igor Nikonov 2024-02-07 00:52:25 +00:00
parent dab078f7d2
commit 10082399d5
4 changed files with 19 additions and 15 deletions

View File

@ -27,6 +27,9 @@ class IConnectionPool : private boost::noncopyable
public:
using Entry = PoolBase<Connection>::Entry;
IConnectionPool() = default;
IConnectionPool(String host_, UInt16 port_) : host(host_), port(port_), address(host + ":" + toString(port_)) {}
virtual ~IConnectionPool() = default;
/// Selects the connection to work.
@ -36,7 +39,15 @@ public:
const Settings & settings,
bool force_connected = true) = 0;
const std::string & getHost() const { return host; }
UInt16 getPort() const { return port; }
const String & getAddress() const { return address; }
virtual Priority getPriority() const { return Priority{1}; }
protected:
const String host;
const UInt16 port = 0;
const String address;
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -63,10 +74,9 @@ public:
Protocol::Compression compression_,
Protocol::Secure secure_,
Priority priority_ = Priority{1})
: Base(max_connections_,
: IConnectionPool(host_, port_),
Base(max_connections_,
getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_),
port(port_),
default_database(default_database_),
user(user_),
password(password_),
@ -99,10 +109,6 @@ public:
return entry;
}
const std::string & getHost() const
{
return host;
}
std::string getDescription() const
{
return host + ":" + toString(port);
@ -125,8 +131,6 @@ protected:
}
private:
String host;
UInt16 port;
String default_database;
String user;
String password;

View File

@ -617,7 +617,7 @@ The server successfully detected this situation and will download merged part fr
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
M(ParallelReplicasAvailableCount, "Number of replicas available to execute a query with task-based parallel replicas") \
M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but unavailable, to execute a query with task-based parallel replicas") \
M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but found to be unavailable during query execution with task-based parallel replicas") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

View File

@ -375,10 +375,11 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, storage_limits(std::move(storage_limits_))
, log(log_)
{
std::vector<String> description;
chassert(cluster->getShardCount() == 1);
for (const auto & address : cluster->getShardsAddresses())
description.push_back(fmt::format("Replica: {}", address[0].host_name));
std::vector<String> description;
for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools)
description.push_back(fmt::format("Replica: {}", pool->getHost()));
setStepDescription(boost::algorithm::join(description, ", "));
}
@ -412,7 +413,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
all_replicas_count = shard.getAllNodeCount();
}
chassert(cluster->getShardCount() == 1);
auto shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(all_replicas_count);

View File

@ -105,7 +105,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
connection_entries.emplace_back(std::move(result.entry));
}
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);