mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #59534 from ClickHouse/use-connection-pool-ptr
Use ConnectionPoolPtr instead of raw pointer
This commit is contained in:
commit
966f7eeabe
@ -22,12 +22,12 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
ConnectionEstablisher::ConnectionEstablisher(
|
||||
IConnectionPool * pool_,
|
||||
ConnectionPoolPtr pool_,
|
||||
const ConnectionTimeouts * timeouts_,
|
||||
const Settings & settings_,
|
||||
LoggerPtr log_,
|
||||
const QualifiedTableName * table_to_check_)
|
||||
: pool(pool_), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_), is_finished(false)
|
||||
: pool(std::move(pool_)), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_), is_finished(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -111,12 +111,13 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
|
||||
#if defined(OS_LINUX)
|
||||
|
||||
ConnectionEstablisherAsync::ConnectionEstablisherAsync(
|
||||
IConnectionPool * pool_,
|
||||
ConnectionPoolPtr pool_,
|
||||
const ConnectionTimeouts * timeouts_,
|
||||
const Settings & settings_,
|
||||
LoggerPtr log_,
|
||||
const QualifiedTableName * table_to_check_)
|
||||
: AsyncTaskExecutor(std::make_unique<Task>(*this)), connection_establisher(pool_, timeouts_, settings_, log_, table_to_check_)
|
||||
: AsyncTaskExecutor(std::make_unique<Task>(*this))
|
||||
, connection_establisher(std::move(pool_), timeouts_, settings_, log_, table_to_check_)
|
||||
{
|
||||
epoll.add(timeout_descriptor.getDescriptor());
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ class ConnectionEstablisher
|
||||
public:
|
||||
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
|
||||
|
||||
ConnectionEstablisher(IConnectionPool * pool_,
|
||||
ConnectionEstablisher(ConnectionPoolPtr pool_,
|
||||
const ConnectionTimeouts * timeouts_,
|
||||
const Settings & settings_,
|
||||
LoggerPtr log,
|
||||
@ -35,7 +35,7 @@ public:
|
||||
bool isFinished() const { return is_finished; }
|
||||
|
||||
private:
|
||||
IConnectionPool * pool;
|
||||
ConnectionPoolPtr pool;
|
||||
const ConnectionTimeouts * timeouts;
|
||||
const Settings & settings;
|
||||
LoggerPtr log;
|
||||
@ -58,7 +58,7 @@ class ConnectionEstablisherAsync : public AsyncTaskExecutor
|
||||
public:
|
||||
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
|
||||
|
||||
ConnectionEstablisherAsync(IConnectionPool * pool_,
|
||||
ConnectionEstablisherAsync(ConnectionPoolPtr pool_,
|
||||
const ConnectionTimeouts * timeouts_,
|
||||
const Settings & settings_,
|
||||
LoggerPtr log_,
|
||||
|
@ -63,7 +63,7 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
|
||||
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
|
||||
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
|
||||
|
||||
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
|
||||
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
|
||||
{
|
||||
return tryGetEntry(pool, timeouts, fail_message, settings, {});
|
||||
};
|
||||
@ -126,7 +126,7 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(
|
||||
std::optional<bool> skip_unavailable_endpoints,
|
||||
GetPriorityForLoadBalancing::Func priority_func)
|
||||
{
|
||||
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
|
||||
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
|
||||
{ return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); };
|
||||
|
||||
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
|
||||
@ -143,7 +143,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
|
||||
const Settings & settings,
|
||||
PoolMode pool_mode)
|
||||
{
|
||||
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
|
||||
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
|
||||
{
|
||||
return tryGetEntry(pool, timeouts, fail_message, settings);
|
||||
};
|
||||
@ -160,7 +160,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
|
||||
std::optional<bool> skip_unavailable_endpoints,
|
||||
GetPriorityForLoadBalancing::Func priority_func)
|
||||
{
|
||||
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
|
||||
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
|
||||
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
|
||||
|
||||
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
|
||||
@ -216,7 +216,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
|
||||
|
||||
ConnectionPoolWithFailover::TryResult
|
||||
ConnectionPoolWithFailover::tryGetEntry(
|
||||
IConnectionPool & pool,
|
||||
const ConnectionPoolPtr & pool,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
std::string & fail_message,
|
||||
const Settings & settings,
|
||||
@ -226,7 +226,7 @@ ConnectionPoolWithFailover::tryGetEntry(
|
||||
#if defined(OS_LINUX)
|
||||
if (async_callback)
|
||||
{
|
||||
ConnectionEstablisherAsync connection_establisher_async(&pool, &timeouts, settings, log, table_to_check);
|
||||
ConnectionEstablisherAsync connection_establisher_async(pool, &timeouts, settings, log, table_to_check);
|
||||
while (true)
|
||||
{
|
||||
connection_establisher_async.resume();
|
||||
@ -246,7 +246,7 @@ ConnectionPoolWithFailover::tryGetEntry(
|
||||
}
|
||||
#endif
|
||||
|
||||
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check);
|
||||
ConnectionEstablisher connection_establisher(pool, &timeouts, settings, log, table_to_check);
|
||||
TryResult result;
|
||||
connection_establisher.run(result, fail_message);
|
||||
return result;
|
||||
|
@ -115,7 +115,7 @@ private:
|
||||
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
|
||||
/// for this table is not too large.
|
||||
TryResult tryGetEntry(
|
||||
IConnectionPool & pool,
|
||||
const ConnectionPoolPtr & pool,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
std::string & fail_message,
|
||||
const Settings & settings,
|
||||
|
@ -41,8 +41,9 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
|
||||
, skip_unavailable_shards(skip_unavailable_shards_)
|
||||
{
|
||||
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
|
||||
for (auto shuffled_pool : shuffled_pools)
|
||||
replicas.emplace_back(std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
|
||||
for (const auto & shuffled_pool : shuffled_pools)
|
||||
replicas.emplace_back(
|
||||
std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
|
||||
}
|
||||
|
||||
HedgedConnectionsFactory::~HedgedConnectionsFactory()
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
|
||||
struct ShuffledPool
|
||||
{
|
||||
NestedPool * pool{};
|
||||
NestedPoolPtr pool{};
|
||||
const PoolState * state{}; // WARNING: valid only during initial ordering, dangling
|
||||
size_t index = 0;
|
||||
size_t error_count = 0;
|
||||
@ -110,7 +110,7 @@ public:
|
||||
|
||||
/// This functor must be provided by a client. It must perform a single try that takes a connection
|
||||
/// from the provided pool and checks that it is good.
|
||||
using TryGetEntryFunc = std::function<TryResult(NestedPool & pool, std::string & fail_message)>;
|
||||
using TryGetEntryFunc = std::function<TryResult(const NestedPoolPtr & pool, std::string & fail_message)>;
|
||||
|
||||
/// The client can provide this functor to affect load balancing - the index of a pool is passed to
|
||||
/// this functor. The pools with lower result value will be tried first.
|
||||
@ -181,7 +181,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
|
||||
std::vector<ShuffledPool> shuffled_pools;
|
||||
shuffled_pools.reserve(nested_pools.size());
|
||||
for (size_t i = 0; i < nested_pools.size(); ++i)
|
||||
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
|
||||
shuffled_pools.push_back(ShuffledPool{nested_pools[i], &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
|
||||
|
||||
::sort(
|
||||
shuffled_pools.begin(), shuffled_pools.end(),
|
||||
@ -267,7 +267,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
|
||||
continue;
|
||||
|
||||
std::string fail_message;
|
||||
result = try_get_entry(*shuffled_pool.pool, fail_message);
|
||||
result = try_get_entry(shuffled_pool.pool, fail_message);
|
||||
|
||||
if (!fail_message.empty())
|
||||
fail_messages += fail_message + '\n';
|
||||
|
Loading…
Reference in New Issue
Block a user