diff --git a/src/DataStreams/ConnectionCollector.cpp b/src/DataStreams/ConnectionCollector.cpp index 054075fcef5..3a411fd2e33 100644 --- a/src/DataStreams/ConnectionCollector.cpp +++ b/src/DataStreams/ConnectionCollector.cpp @@ -54,21 +54,20 @@ struct AsyncDrainTask }; std::shared_ptr ConnectionCollector::enqueueConnectionCleanup( - const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr connections) noexcept + const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr connections) noexcept { if (!connections) return nullptr; - std::shared_ptr shared_connections = std::move(connections); if (connection_collector) { - if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, shared_connections})) + if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, connections})) { CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1); return nullptr; } } - return shared_connections; + return connections; } void ConnectionCollector::drainConnections(IConnections & connections) noexcept diff --git a/src/DataStreams/ConnectionCollector.h b/src/DataStreams/ConnectionCollector.h index 634f9d71cdf..5b6e82d000e 100644 --- a/src/DataStreams/ConnectionCollector.h +++ b/src/DataStreams/ConnectionCollector.h @@ -16,7 +16,7 @@ class ConnectionCollector : boost::noncopyable, WithMutableContext public: static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads); static std::shared_ptr - enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr connections) noexcept; + enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr connections) noexcept; static void drainConnections(IConnections & connections) noexcept; private: diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 1522e0ee852..b8276d0d5c6 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -44,7 +44,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( { create_connections = [this, &connection, throttler]() { - return std::make_unique(connection, context->getSettingsRef(), throttler); + return std::make_shared(connection, context->getSettingsRef(), throttler); }; } @@ -58,7 +58,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_) { create_connections = [this, connections_, throttler]() mutable { - return std::make_unique(std::move(connections_), context->getSettingsRef(), throttler); + return std::make_shared(std::move(connections_), context->getSettingsRef(), throttler); }; } @@ -70,7 +70,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( : header(header_), query(query_), context(context_) , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_) { - create_connections = [this, throttler]()->std::unique_ptr + create_connections = [this, throttler]()->std::shared_ptr { const Settings & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); @@ -82,7 +82,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( if (main_table) table_to_check = std::make_shared(main_table.getQualifiedName()); - return std::make_unique(pool, current_settings, timeouts, throttler, pool_mode, table_to_check); + return std::make_shared(pool, current_settings, timeouts, throttler, pool_mode, table_to_check); } #endif @@ -97,7 +97,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( else connection_entries = pool->getMany(timeouts, ¤t_settings, pool_mode); - return std::make_unique(std::move(connection_entries), current_settings, throttler); + return std::make_shared(std::move(connection_entries), current_settings, throttler); }; } @@ -107,7 +107,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor() * all connections, then read and skip the remaining packets to make sure * these connections did not remain hanging in the out-of-sync state. */ - if (connections && (established || isQueryPending())) + if (established || isQueryPending()) connections->disconnect(); } @@ -417,7 +417,7 @@ void RemoteQueryExecutor::finish(std::unique_ptr * read_context) { /// Finish might be called in multiple threads. Make sure we release connections in thread-safe way. std::lock_guard guard(connection_draining_mutex); - if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, std::move(connections))) + if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections)) { /// Drain connections synchronously. CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections); diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index 5404a126150..7b0a0fc0d2e 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -127,11 +127,11 @@ private: /// Drain connection synchronously when finishing. bool sync_draining = false; - std::function()> create_connections; + std::function()> create_connections; /// Hold a shared reference to the connection pool so that asynchronous connection draining will /// work safely. Make sure it's the first member so that we don't destruct it too early. const ConnectionPoolWithFailoverPtr pool; - std::unique_ptr connections; + std::shared_ptr connections; /// Streams for reading from temporary tables and following sending of data /// to remote servers for GLOBAL-subqueries