From dbfb699690f923abe32bab1c23258411948ede89 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Wed, 14 Jul 2021 21:17:30 +0800 Subject: [PATCH] Asynchronously drain connections. --- programs/copier/ClusterCopier.cpp | 17 ++- programs/server/Server.cpp | 3 + src/Client/HedgedConnections.cpp | 10 +- src/Client/HedgedConnections.h | 8 +- src/Client/IConnections.cpp | 31 +++++ src/Client/IConnections.h | 8 +- src/Client/MultiplexedConnections.cpp | 40 ++++-- src/Client/MultiplexedConnections.h | 9 +- src/Common/CurrentMetrics.cpp | 4 + src/Core/Defines.h | 1 + src/Core/Settings.h | 3 + src/DataStreams/ConnectionCollector.cpp | 116 ++++++++++++++++++ src/DataStreams/ConnectionCollector.h | 30 +++++ src/DataStreams/RemoteBlockInputStream.cpp | 21 ++-- src/DataStreams/RemoteBlockInputStream.h | 25 ++-- src/DataStreams/RemoteQueryExecutor.cpp | 72 +++++------ src/DataStreams/RemoteQueryExecutor.h | 18 ++- .../RemoteQueryExecutorReadContext.cpp | 4 +- src/Processors/QueryPlan/ReadFromRemote.cpp | 2 +- src/Server/TCPHandler.cpp | 27 ++++ src/Storages/StorageDistributed.cpp | 8 +- src/Storages/StorageS3Cluster.cpp | 11 +- .../test_async_drain_connection/__init__.py | 0 .../configs/config.xml | 4 + .../test_async_drain_connection/test.py | 36 ++++++ .../01920_async_drain_connections.reference | 2 + .../01920_async_drain_connections.sql | 6 + 27 files changed, 414 insertions(+), 102 deletions(-) create mode 100644 src/Client/IConnections.cpp create mode 100644 src/DataStreams/ConnectionCollector.cpp create mode 100644 src/DataStreams/ConnectionCollector.h create mode 100644 tests/integration/test_async_drain_connection/__init__.py create mode 100644 tests/integration/test_async_drain_connection/configs/config.xml create mode 100644 tests/integration/test_async_drain_connection/test.py create mode 100644 tests/queries/0_stateless/01920_async_drain_connections.reference create mode 100644 tests/queries/0_stateless/01920_async_drain_connections.sql diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 128f8bc1cdd..cf0b6cc76a4 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -1702,14 +1702,15 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT LOG_INFO(log, "All helping tables dropped partition {}", partition_name); } -String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings) +String ClusterCopier::getRemoteCreateTable( + const DatabaseAndTableName & table, Connection & connection, const Settings & settings) { auto remote_context = Context::createCopy(context); remote_context->setSettings(settings); String query = "SHOW CREATE TABLE " + getQuotedTable(table); - Block block = getBlockWithAllStreamData(std::make_shared( - connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context)); + Block block = getBlockWithAllStreamData( + std::make_shared(connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context)); return typeid_cast(*block.safeGetByPosition(0).column).getDataAt(0).toString(); } @@ -1719,10 +1720,8 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time { /// Fetch and parse (possibly) new definition auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true); - String create_query_pull_str = getRemoteCreateTable( - task_shard.task_table.table_pull, - *connection_entry, - task_cluster->settings_pull); + String create_query_pull_str + = getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, task_cluster->settings_pull); ParserCreateQuery parser_create_query; const auto & settings = getContext()->getSettingsRef(); @@ -1953,8 +1952,8 @@ UInt64 ClusterCopier::executeQueryOnCluster( /// For unknown reason global context is passed to IStorage::read() method /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( - *connections.back(), query, header, getContext(), - /*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete); + *connections.back(), query, header, getContext(), + /*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete); try { diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d4f830e5a0c..9ba7650bc71 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -59,6 +59,7 @@ #include #include #include +#include #include #include #include @@ -503,6 +504,8 @@ if (ThreadFuzzer::instance().isEffective()) // ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well. GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000)); + ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10)); + bool has_zookeeper = config().has("zookeeper"); zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); }); diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 0c461d2f399..68ca8b107bf 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -28,6 +28,8 @@ HedgedConnections::HedgedConnections( std::shared_ptr table_to_check_) : hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_) , settings(settings_) + , drain_timeout(settings.drain_timeout) + , allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet) , throttler(throttler_) { std::vector connections = hedged_connections_factory.getManyConnections(pool_mode); @@ -251,7 +253,7 @@ Packet HedgedConnections::drain() while (!epoll.empty()) { - ReplicaLocation location = getReadyReplicaLocation(); + ReplicaLocation location = getReadyReplicaLocation(DrainCallback{drain_timeout}); Packet packet = receivePacketFromReplica(location); switch (packet.type) { @@ -278,10 +280,10 @@ Packet HedgedConnections::drain() Packet HedgedConnections::receivePacket() { std::lock_guard lock(cancel_mutex); - return receivePacketUnlocked({}); + return receivePacketUnlocked({}, false /* is_draining */); } -Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback) +Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool /* is_draining */) { if (!sent_query) throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); @@ -396,7 +398,7 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli { /// If we are allowed to change replica until the first data packet, /// just restart timeout (if it hasn't expired yet). Otherwise disable changing replica with this offset. - if (settings.allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired) + if (allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired) replica.change_replica_timeout.setRelative(hedged_connections_factory.getConnectionTimeouts().receive_data_timeout); else disableChangingReplica(replica_location); diff --git a/src/Client/HedgedConnections.h b/src/Client/HedgedConnections.h index 9f7d8837536..f41142346ed 100644 --- a/src/Client/HedgedConnections.h +++ b/src/Client/HedgedConnections.h @@ -97,7 +97,7 @@ public: Packet receivePacket() override; - Packet receivePacketUnlocked(AsyncCallback async_callback) override; + Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override; void disconnect() override; @@ -189,6 +189,12 @@ private: Epoll epoll; const Settings & settings; + + /// The following two fields are from settings but can be referenced outside the lifetime of + /// settings when connection is drained asynchronously. + Poco::Timespan drain_timeout; + bool allow_changing_replica_until_first_data_packet; + ThrottlerPtr throttler; bool sent_query = false; bool cancelled = false; diff --git a/src/Client/IConnections.cpp b/src/Client/IConnections.cpp new file mode 100644 index 00000000000..39c4e0d19e3 --- /dev/null +++ b/src/Client/IConnections.cpp @@ -0,0 +1,31 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int SOCKET_TIMEOUT; +} + +/// This wrapper struct allows us to use Poco's socket polling code with a raw fd. +/// The only difference from Poco::Net::SocketImpl is that we don't close the fd in the destructor. +struct PocoSocketWrapper : public Poco::Net::SocketImpl +{ + explicit PocoSocketWrapper(int fd) + { + reset(fd); + } + + // Do not close fd. + ~PocoSocketWrapper() override = default; +}; + +void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string fd_description) const +{ + if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ)) + throw Exception(ErrorCodes::SOCKET_TIMEOUT, "Read timeout while draining from {}", fd_description); +} + +} diff --git a/src/Client/IConnections.h b/src/Client/IConnections.h index d251a5fb3ab..53267cbbb3e 100644 --- a/src/Client/IConnections.h +++ b/src/Client/IConnections.h @@ -10,6 +10,12 @@ namespace DB class IConnections : boost::noncopyable { public: + struct DrainCallback + { + Poco::Timespan drain_timeout; + void operator()(int fd, Poco::Timespan, const std::string fd_description = "") const; + }; + /// Send all scalars to replicas. virtual void sendScalarsData(Scalars & data) = 0; /// Send all content of external tables to replicas. @@ -30,7 +36,7 @@ public: virtual Packet receivePacket() = 0; /// Version of `receivePacket` function without locking. - virtual Packet receivePacketUnlocked(AsyncCallback async_callback) = 0; + virtual Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) = 0; /// Break all active connections. virtual void disconnect() = 0; diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index 350beffce28..e298849ad54 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -18,7 +18,7 @@ namespace ErrorCodes MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler) - : settings(settings_) + : settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout) { connection.setThrottler(throttler); @@ -30,9 +30,8 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se } MultiplexedConnections::MultiplexedConnections( - std::vector && connections, - const Settings & settings_, const ThrottlerPtr & throttler) - : settings(settings_) + std::vector && connections, const Settings & settings_, const ThrottlerPtr & throttler) + : settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout) { /// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that /// `skip_unavailable_shards` was set. Then just return. @@ -168,7 +167,7 @@ void MultiplexedConnections::sendReadTaskResponse(const String & response) Packet MultiplexedConnections::receivePacket() { std::lock_guard lock(cancel_mutex); - Packet packet = receivePacketUnlocked({}); + Packet packet = receivePacketUnlocked({}, false /* is_draining */); return packet; } @@ -216,7 +215,7 @@ Packet MultiplexedConnections::drain() while (hasActiveConnections()) { - Packet packet = receivePacketUnlocked({}); + Packet packet = receivePacketUnlocked(DrainCallback{drain_timeout}, true /* is_draining */); switch (packet.type) { @@ -264,14 +263,14 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const return buf.str(); } -Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback) +Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) { if (!sent_query) throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); if (!hasActiveConnections()) throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - ReplicaState & state = getReplicaForReading(); + ReplicaState & state = getReplicaForReading(is_draining); current_connection = state.connection; if (current_connection == nullptr) throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA); @@ -323,9 +322,10 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac return packet; } -MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading() +MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading(bool is_draining) { - if (replica_states.size() == 1) + /// Fast path when we only focus on one replica and are not draining the connection. + if (replica_states.size() == 1 && !is_draining) return replica_states[0]; Poco::Net::Socket::SocketList read_list; @@ -353,10 +353,26 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead read_list.push_back(*connection->socket); } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout); + int n = Poco::Net::Socket::select( + read_list, + write_list, + except_list, + is_draining ? drain_timeout : receive_timeout); if (n == 0) - throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED); + { + auto err_msg = fmt::format("Timeout exceeded while reading from {}", dumpAddressesUnlocked()); + for (ReplicaState & state : replica_states) + { + Connection * connection = state.connection; + if (connection != nullptr) + { + connection->disconnect(); + invalidateReplica(state); + } + } + throw Exception(err_msg, ErrorCodes::TIMEOUT_EXCEEDED); + } } /// TODO Absolutely wrong code: read_list could be empty; motivation of rand is unclear. diff --git a/src/Client/MultiplexedConnections.h b/src/Client/MultiplexedConnections.h index f642db1c4cd..c42653b5f02 100644 --- a/src/Client/MultiplexedConnections.h +++ b/src/Client/MultiplexedConnections.h @@ -61,7 +61,7 @@ public: bool hasActiveConnections() const override { return active_connection_count > 0; } private: - Packet receivePacketUnlocked(AsyncCallback async_callback) override; + Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override; /// Internal version of `dumpAddresses` function without locking. std::string dumpAddressesUnlocked() const; @@ -74,7 +74,7 @@ private: }; /// Get a replica where you can read the data. - ReplicaState & getReplicaForReading(); + ReplicaState & getReplicaForReading(bool is_draining); /// Mark the replica as invalid. void invalidateReplica(ReplicaState & replica_state); @@ -82,6 +82,11 @@ private: private: const Settings & settings; + /// The following two fields are from settings but can be referenced outside the lifetime of + /// settings when connection is drained asynchronously. + Poco::Timespan drain_timeout; + Poco::Timespan receive_timeout; + /// The current number of valid connections to the replicas of this shard. size_t active_connection_count = 0; diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index e9fa13e11e6..f94c3421107 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -71,6 +71,10 @@ M(PartsInMemory, "In-memory parts.") \ M(MMappedFiles, "Total number of mmapped files.") \ M(MMappedFileBytes, "Sum size of mmapped file regions.") \ + M(AsyncDrainedConnections, "Number of connections drained asynchronously.") \ + M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \ + M(SyncDrainedConnections, "Number of connections drained synchronously.") \ + M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \ namespace CurrentMetrics { diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 5751f4beeb7..fa4eb9d386c 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -11,6 +11,7 @@ #define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS 100 #define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300 #define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300 +#define DBMS_DEFAULT_DRAIN_TIMEOUT_SEC 3 /// Timeouts for hedged requests. #define DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS 100 #define DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS 2000 diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 55566e2e7a4..c8befbb81b2 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -54,6 +54,7 @@ class IColumn; M(Milliseconds, connect_timeout_with_failover_secure_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \ M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \ M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \ + M(Seconds, drain_timeout, DBMS_DEFAULT_DRAIN_TIMEOUT_SEC, "", 0) \ M(Seconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \ M(Milliseconds, hedged_connection_timeout_ms, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \ M(Milliseconds, receive_data_timeout_ms, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \ @@ -233,6 +234,8 @@ class IColumn; M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \ M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \ M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \ + /** Settings for testing connection collector */ \ + M(Milliseconds, sleep_in_receive_cancel_ms, 0, "Time to sleep in receiving cancel in TCPHandler", 0) \ \ M(Bool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.", 0) \ M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \ diff --git a/src/DataStreams/ConnectionCollector.cpp b/src/DataStreams/ConnectionCollector.cpp new file mode 100644 index 00000000000..054075fcef5 --- /dev/null +++ b/src/DataStreams/ConnectionCollector.cpp @@ -0,0 +1,116 @@ +#include + +#include +#include +#include +#include + +namespace CurrentMetrics +{ +extern const Metric AsyncDrainedConnections; +extern const Metric ActiveAsyncDrainedConnections; +} + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int UNKNOWN_PACKET_FROM_SERVER; +} + +std::unique_ptr ConnectionCollector::connection_collector; + +static constexpr UInt64 max_connection_draining_tasks_per_thread = 20; + +ConnectionCollector::ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads) + : WithMutableContext(global_context_), pool(max_threads, max_threads, max_threads * max_connection_draining_tasks_per_thread) +{ +} + +ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context_, size_t max_threads) +{ + if (connection_collector) + { + throw Exception("Connection collector is initialized twice. This is a bug.", ErrorCodes::LOGICAL_ERROR); + } + + connection_collector.reset(new ConnectionCollector(global_context_, max_threads)); + return *connection_collector; +} + +struct AsyncDrainTask +{ + const ConnectionPoolWithFailoverPtr pool; + std::shared_ptr shared_connections; + void operator()() const + { + ConnectionCollector::drainConnections(*shared_connections); + } + + // We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable. + std::shared_ptr metric_increment + = std::make_shared(CurrentMetrics::ActiveAsyncDrainedConnections); +}; + +std::shared_ptr ConnectionCollector::enqueueConnectionCleanup( + const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr connections) noexcept +{ + if (!connections) + return nullptr; + + std::shared_ptr shared_connections = std::move(connections); + if (connection_collector) + { + if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, shared_connections})) + { + CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1); + return nullptr; + } + } + return shared_connections; +} + +void ConnectionCollector::drainConnections(IConnections & connections) noexcept +{ + bool is_drained = false; + try + { + Packet packet = connections.drain(); + is_drained = true; + switch (packet.type) + { + case Protocol::Server::EndOfStream: + case Protocol::Server::Log: + break; + + case Protocol::Server::Exception: + packet.exception->rethrow(); + break; + + default: + throw Exception( + ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, + "Unknown packet {} from one of the following replicas: {}", + toString(packet.type), + connections.dumpAddresses()); + } + } + catch (...) + { + tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__); + if (!is_drained) + { + try + { + connections.disconnect(); + } + catch (...) + { + tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__); + } + } + } +} + +} diff --git a/src/DataStreams/ConnectionCollector.h b/src/DataStreams/ConnectionCollector.h new file mode 100644 index 00000000000..634f9d71cdf --- /dev/null +++ b/src/DataStreams/ConnectionCollector.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class ConnectionPoolWithFailover; +using ConnectionPoolWithFailoverPtr = std::shared_ptr; + +class ConnectionCollector : boost::noncopyable, WithMutableContext +{ +public: + static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads); + static std::shared_ptr + enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr connections) noexcept; + static void drainConnections(IConnections & connections) noexcept; + +private: + explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads); + + static constexpr size_t reschedule_time_ms = 1000; + ThreadPool pool; + static std::unique_ptr connection_collector; +}; + +} diff --git a/src/DataStreams/RemoteBlockInputStream.cpp b/src/DataStreams/RemoteBlockInputStream.cpp index c633600d37f..32acd34ec30 100644 --- a/src/DataStreams/RemoteBlockInputStream.cpp +++ b/src/DataStreams/RemoteBlockInputStream.cpp @@ -5,27 +5,28 @@ namespace DB { RemoteBlockInputStream::RemoteBlockInputStream( - Connection & connection, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + Connection & connection, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : query_executor(connection, query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); } RemoteBlockInputStream::RemoteBlockInputStream( - std::vector && connections, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : query_executor(std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_) + const ConnectionPoolWithFailoverPtr & pool, + std::vector && connections, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + : query_executor(pool, std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); } RemoteBlockInputStream::RemoteBlockInputStream( - const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) + const ConnectionPoolWithFailoverPtr & pool, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : query_executor(pool, query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); diff --git a/src/DataStreams/RemoteBlockInputStream.h b/src/DataStreams/RemoteBlockInputStream.h index b0029da91bb..9d70005a2e2 100644 --- a/src/DataStreams/RemoteBlockInputStream.h +++ b/src/DataStreams/RemoteBlockInputStream.h @@ -24,24 +24,25 @@ class RemoteBlockInputStream : public IBlockInputStream public: /// Takes already set connection. RemoteBlockInputStream( - Connection & connection, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + Connection & connection, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Accepts several connections already taken from pool. RemoteBlockInputStream( - std::vector && connections, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + const ConnectionPoolWithFailoverPtr & pool, + std::vector && connections, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Takes a pool and gets one or several connections from it. RemoteBlockInputStream( - const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); + const ConnectionPoolWithFailoverPtr & pool, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Set the query_id. For now, used by performance test to later find the query /// in the server query_log. Must be called before sending the query to the server. diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 0c60bfdbfdb..1522e0ee852 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -17,6 +18,12 @@ #include #include +namespace CurrentMetrics +{ +extern const Metric SyncDrainedConnections; +extern const Metric ActiveSyncDrainedConnections; +} + namespace DB { @@ -33,7 +40,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), sync_draining(true) { create_connections = [this, &connection, throttler]() { @@ -42,12 +49,13 @@ RemoteQueryExecutor::RemoteQueryExecutor( } RemoteQueryExecutor::RemoteQueryExecutor( + const ConnectionPoolWithFailoverPtr & pool_, std::vector && connections_, const String & query_, const Block & header_, ContextPtr context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_) { create_connections = [this, connections_, throttler]() mutable { return std::make_unique(std::move(connections_), context->getSettingsRef(), throttler); @@ -55,14 +63,14 @@ RemoteQueryExecutor::RemoteQueryExecutor( } RemoteQueryExecutor::RemoteQueryExecutor( - const ConnectionPoolWithFailoverPtr & pool, + const ConnectionPoolWithFailoverPtr & pool_, const String & query_, const Block & header_, ContextPtr context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_) { - create_connections = [this, pool, throttler]()->std::unique_ptr + create_connections = [this, throttler]()->std::unique_ptr { const Settings & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); @@ -99,7 +107,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor() * all connections, then read and skip the remaining packets to make sure * these connections did not remain hanging in the out-of-sync state. */ - if (established || isQueryPending()) + if (connections && (established || isQueryPending())) connections->disconnect(); } @@ -406,32 +414,18 @@ void RemoteQueryExecutor::finish(std::unique_ptr * read_context) /// Send the request to abort the execution of the request, if not already sent. tryCancel("Cancelling query because enough data has been read", read_context); - - /// Get the remaining packets so that there is no out of sync in the connections to the replicas. - Packet packet = connections->drain(); - switch (packet.type) { - case Protocol::Server::EndOfStream: - finished = true; - break; - - case Protocol::Server::Log: - /// Pass logs from remote server to client - if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) - log_queue->pushBlock(std::move(packet.block)); - break; - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}", - toString(packet.type), - connections->dumpAddresses()); + /// Finish might be called in multiple threads. Make sure we release connections in thread-safe way. + std::lock_guard guard(connection_draining_mutex); + if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, std::move(connections))) + { + /// Drain connections synchronously. + CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections); + ConnectionCollector::drainConnections(*conn); + CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1); + } } + finished = true; } void RemoteQueryExecutor::cancel(std::unique_ptr * read_context) @@ -506,20 +500,18 @@ void RemoteQueryExecutor::sendExternalTables() void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr * read_context) { - { - /// Flag was_cancelled is atomic because it is checked in read(). - std::lock_guard guard(was_cancelled_mutex); + /// Flag was_cancelled is atomic because it is checked in read(). + std::lock_guard guard(was_cancelled_mutex); - if (was_cancelled) - return; + if (was_cancelled) + return; - was_cancelled = true; + was_cancelled = true; - if (read_context && *read_context) - (*read_context)->cancel(); + if (read_context && *read_context) + (*read_context)->cancel(); - connections->sendCancel(); - } + connections->sendCancel(); if (log) LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason); diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index a9cffd9cf97..5404a126150 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -36,6 +36,7 @@ public: using ReadContext = RemoteQueryExecutorReadContext; /// Takes already set connection. + /// We don't own connection, thus we have to drain it synchronously. RemoteQueryExecutor( Connection & connection, const String & query_, const Block & header_, ContextPtr context_, @@ -44,6 +45,7 @@ public: /// Accepts several connections already taken from pool. RemoteQueryExecutor( + const ConnectionPoolWithFailoverPtr & pool, std::vector && connections_, const String & query_, const Block & header_, ContextPtr context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), @@ -107,9 +109,6 @@ private: Block totals; Block extremes; - std::function()> create_connections; - std::unique_ptr connections; - const String query; String query_id; ContextPtr context; @@ -125,6 +124,15 @@ private: /// Initiator identifier for distributed task processing std::shared_ptr task_iterator; + /// Drain connection synchronously when finishing. + bool sync_draining = false; + + std::function()> create_connections; + /// Hold a shared reference to the connection pool so that asynchronous connection draining will + /// work safely. Make sure it's the first member so that we don't destruct it too early. + const ConnectionPoolWithFailoverPtr pool; + std::unique_ptr connections; + /// Streams for reading from temporary tables and following sending of data /// to remote servers for GLOBAL-subqueries std::vector external_tables_data; @@ -151,6 +159,10 @@ private: std::atomic was_cancelled { false }; std::mutex was_cancelled_mutex; + /** Thread-safe connection draining. + */ + std::mutex connection_draining_mutex; + /** An exception from replica was received. No need in receiving more packets or * requesting to cancel query execution */ diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.cpp b/src/DataStreams/RemoteQueryExecutorReadContext.cpp index eb89fd9398f..c1f415bb597 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.cpp +++ b/src/DataStreams/RemoteQueryExecutorReadContext.cpp @@ -43,7 +43,7 @@ struct RemoteQueryExecutorRoutine { while (true) { - read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}); + read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}, false /* is_draining */); sink = std::move(sink).resume(); } } @@ -144,7 +144,7 @@ bool RemoteQueryExecutorReadContext::checkTimeoutImpl(bool blocking) if (is_timer_alarmed && !is_socket_ready) { - /// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception. + /// Socket receive timeout. Drain it in case of error, or it may be hide by timeout exception. timer.drain(); throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT); } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 8c0a7050397..63270237e44 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -172,7 +172,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto String query_string = formattedAST(query); auto remote_query_executor = std::make_shared( - std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage); + pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage); return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read); } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index c6319620899..18c73918127 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1026,7 +1026,17 @@ bool TCPHandler::receivePacket() return false; case Protocol::Client::Cancel: + { + /// For testing connection collector. + const Settings & settings = query_context->getSettingsRef(); + if (settings.sleep_in_receive_cancel_ms.totalMilliseconds()) + { + std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } + return false; + } case Protocol::Client::Hello: receiveUnexpectedHello(); @@ -1063,6 +1073,13 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked() if (packet_type == Protocol::Client::Cancel) { state.is_cancelled = true; + /// For testing connection collector. + const Settings & settings = query_context->getSettingsRef(); + if (settings.sleep_in_receive_cancel_ms.totalMilliseconds()) + { + std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } return {}; } else @@ -1461,6 +1478,16 @@ bool TCPHandler::isQueryCancelled() throw NetException("Unexpected packet Cancel received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); LOG_INFO(log, "Query was cancelled."); state.is_cancelled = true; + /// For testing connection collector. + { + const Settings & settings = query_context->getSettingsRef(); + if (settings.sleep_in_receive_cancel_ms.totalMilliseconds()) + { + std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } + } + return true; default: diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 5d87fd25f93..b5e4c457d93 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -4,7 +4,7 @@ #include -#include +#include #include #include @@ -57,6 +57,7 @@ #include #include #include +#include #include #include @@ -739,9 +740,10 @@ QueryPipelinePtr StorageDistributed::distributedWrite(const ASTInsertQuery & que "Expected exactly one connection for shard " + toString(shard_info.shard_num), ErrorCodes::LOGICAL_ERROR); /// INSERT SELECT query returns empty block - auto in_stream = std::make_shared(std::move(connections), new_query_str, Block{}, local_context); + auto remote_query_executor + = std::make_shared(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context); pipelines.emplace_back(std::make_unique()); - pipelines.back()->init(Pipe(std::make_shared(std::move(in_stream)))); + pipelines.back()->init(Pipe(std::make_shared(remote_query_executor, false, settings.async_socket_for_remote))); pipelines.back()->setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr { return std::make_shared(header); diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 8a320190036..726e3a3465f 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -126,8 +126,15 @@ Pipe StorageS3Cluster::read( /// For unknown reason global context is passed to IStorage::read() method /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( - *connections.back(), queryToString(query_info.query), header, context, - /*throttler=*/nullptr, scalars, Tables(), processed_stage, callback); + *connections.back(), + queryToString(query_info.query), + header, + context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + callback); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } diff --git a/tests/integration/test_async_drain_connection/__init__.py b/tests/integration/test_async_drain_connection/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_async_drain_connection/configs/config.xml b/tests/integration/test_async_drain_connection/configs/config.xml new file mode 100644 index 00000000000..0c42ac84d31 --- /dev/null +++ b/tests/integration/test_async_drain_connection/configs/config.xml @@ -0,0 +1,4 @@ + + + 10000 + diff --git a/tests/integration/test_async_drain_connection/test.py b/tests/integration/test_async_drain_connection/test.py new file mode 100644 index 00000000000..21f9b142e7a --- /dev/null +++ b/tests/integration/test_async_drain_connection/test.py @@ -0,0 +1,36 @@ +import os +import sys +import time +from multiprocessing.dummy import Pool +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance("node", main_configs=["configs/config.xml"]) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + node.query( + 'create table t (number UInt64) engine = Distributed(test_cluster_two_shards, system, numbers);' + ) + yield cluster + + finally: + cluster.shutdown() + + +def test_filled_async_drain_connection_pool(started_cluster): + busy_pool = Pool(10) + + def execute_query(i): + for _ in range(100): + node.query('select * from t where number = 0 limit 2;', + settings={ + "sleep_in_receive_cancel_ms": 10000000, + "max_execution_time": 5 + }) + + p = busy_pool.map(execute_query, range(10)) diff --git a/tests/queries/0_stateless/01920_async_drain_connections.reference b/tests/queries/0_stateless/01920_async_drain_connections.reference new file mode 100644 index 00000000000..aa47d0d46d4 --- /dev/null +++ b/tests/queries/0_stateless/01920_async_drain_connections.reference @@ -0,0 +1,2 @@ +0 +0 diff --git a/tests/queries/0_stateless/01920_async_drain_connections.sql b/tests/queries/0_stateless/01920_async_drain_connections.sql new file mode 100644 index 00000000000..827ca13fc1a --- /dev/null +++ b/tests/queries/0_stateless/01920_async_drain_connections.sql @@ -0,0 +1,6 @@ +drop table if exists t; + +create table t (number UInt64) engine = Distributed(test_cluster_two_shards, system, numbers); +select * from t where number = 0 limit 2 settings sleep_in_receive_cancel_ms = 10000, max_execution_time = 5; + +drop table t;