Use shared_ptr for connections

This commit is contained in:
Amos Bird 2021-07-21 19:56:32 +08:00
parent 20b2837b27
commit 205edec80d
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
4 changed files with 13 additions and 14 deletions

View File

@ -54,21 +54,20 @@ struct AsyncDrainTask
};
std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr<IConnections> connections) noexcept
const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept
{
if (!connections)
return nullptr;
std::shared_ptr<IConnections> shared_connections = std::move(connections);
if (connection_collector)
{
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, shared_connections}))
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, connections}))
{
CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1);
return nullptr;
}
}
return shared_connections;
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections) noexcept

View File

@ -16,7 +16,7 @@ class ConnectionCollector : boost::noncopyable, WithMutableContext
public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr<IConnections> connections) noexcept;
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections) noexcept;
private:

View File

@ -44,7 +44,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
create_connections = [this, &connection, throttler]()
{
return std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
return std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
};
}
@ -58,7 +58,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
{
create_connections = [this, connections_, throttler]() mutable {
return std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
return std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
};
}
@ -70,7 +70,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
{
create_connections = [this, throttler]()->std::unique_ptr<IConnections>
create_connections = [this, throttler]()->std::shared_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -82,7 +82,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
return std::make_unique<HedgedConnections>(pool, current_settings, timeouts, throttler, pool_mode, table_to_check);
return std::make_shared<HedgedConnections>(pool, current_settings, timeouts, throttler, pool_mode, table_to_check);
}
#endif
@ -97,7 +97,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
else
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode);
return std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
return std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
};
}
@ -107,7 +107,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
* all connections, then read and skip the remaining packets to make sure
* these connections did not remain hanging in the out-of-sync state.
*/
if (connections && (established || isQueryPending()))
if (established || isQueryPending())
connections->disconnect();
}
@ -417,7 +417,7 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
{
/// Finish might be called in multiple threads. Make sure we release connections in thread-safe way.
std::lock_guard guard(connection_draining_mutex);
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, std::move(connections)))
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections))
{
/// Drain connections synchronously.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);

View File

@ -127,11 +127,11 @@ private:
/// Drain connection synchronously when finishing.
bool sync_draining = false;
std::function<std::unique_ptr<IConnections>()> create_connections;
std::function<std::shared_ptr<IConnections>()> create_connections;
/// Hold a shared reference to the connection pool so that asynchronous connection draining will
/// work safely. Make sure it's the first member so that we don't destruct it too early.
const ConnectionPoolWithFailoverPtr pool;
std::unique_ptr<IConnections> connections;
std::shared_ptr<IConnections> connections;
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries