From bd124a24eaee49afb4c3456be65f3ba37e7a5a30 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Sat, 11 Mar 2023 17:24:38 +0100 Subject: [PATCH] Revert "Merge pull request #31965 from azat/connection-drain-pull" This reverts commit 6cb54b409277fe5534424583ca94e65ea65dc97c, reversing changes made to 42d938ffa092b50d6ed7b30c1ad5b9cd0ec88731. --- programs/server/config.xml | 4 --- src/Client/IConnections.cpp | 7 +---- src/Client/MultiplexedConnections.cpp | 2 +- src/Core/Settings.h | 3 +- src/QueryPipeline/ConnectionCollector.cpp | 10 ++----- src/QueryPipeline/ConnectionCollector.h | 2 +- src/QueryPipeline/RemoteQueryExecutor.cpp | 20 +++---------- .../02127_connection_drain.reference | 2 -- .../0_stateless/02127_connection_drain.sh | 30 ------------------- 9 files changed, 10 insertions(+), 70 deletions(-) delete mode 100644 tests/queries/0_stateless/02127_connection_drain.reference delete mode 100755 tests/queries/0_stateless/02127_connection_drain.sh diff --git a/programs/server/config.xml b/programs/server/config.xml index 85cb299e188..0ea2de18e22 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -348,10 +348,6 @@ 16 --> - - - 0.9 diff --git a/src/Client/IConnections.cpp b/src/Client/IConnections.cpp index 9cc5a62ce12..341089258f5 100644 --- a/src/Client/IConnections.cpp +++ b/src/Client/IConnections.cpp @@ -25,12 +25,7 @@ struct PocoSocketWrapper : public Poco::Net::SocketImpl void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string & fd_description) const { if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ)) - { - throw Exception(ErrorCodes::SOCKET_TIMEOUT, - "Read timeout ({} ms) while draining from {}", - drain_timeout.totalMilliseconds(), - fd_description); - } + throw Exception(ErrorCodes::SOCKET_TIMEOUT, "Read timeout while draining from {}", fd_description); } } diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index cc260353339..084823d71ac 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -390,7 +390,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead Poco::Net::Socket::SocketList write_list; Poco::Net::Socket::SocketList except_list; - auto timeout = is_draining ? drain_timeout : receive_timeout; + auto timeout = receive_timeout; int n = 0; /// EINTR loop diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f67ce6be9ed..6af47563c22 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -57,7 +57,6 @@ class IColumn; M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \ M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \ M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \ - M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \ M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \ M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \ M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \ @@ -759,7 +758,7 @@ class IColumn; MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \ MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \ MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \ - + MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \ /** The section above is for obsolete settings. Do not add anything there. */ diff --git a/src/QueryPipeline/ConnectionCollector.cpp b/src/QueryPipeline/ConnectionCollector.cpp index 7c484dcd6e8..bcee26fa192 100644 --- a/src/QueryPipeline/ConnectionCollector.cpp +++ b/src/QueryPipeline/ConnectionCollector.cpp @@ -47,7 +47,7 @@ struct AsyncDrainTask std::shared_ptr shared_connections; void operator()() const { - ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false); + ConnectionCollector::drainConnections(*shared_connections); } // We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable. @@ -72,7 +72,7 @@ std::shared_ptr ConnectionCollector::enqueueConnectionCleanup( return connections; } -void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error) +void ConnectionCollector::drainConnections(IConnections & connections) noexcept { bool is_drained = false; try @@ -91,9 +91,6 @@ void ConnectionCollector::drainConnections(IConnections & connections, bool thro break; default: - /// Connection should be closed in case of unexpected packet, - /// since this means that the connection in some bad state. - is_drained = false; throw NetException( ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER, "Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)", @@ -115,9 +112,6 @@ void ConnectionCollector::drainConnections(IConnections & connections, bool thro tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__); } } - - if (throw_error) - throw; } } diff --git a/src/QueryPipeline/ConnectionCollector.h b/src/QueryPipeline/ConnectionCollector.h index 44482607277..5b6e82d000e 100644 --- a/src/QueryPipeline/ConnectionCollector.h +++ b/src/QueryPipeline/ConnectionCollector.h @@ -17,7 +17,7 @@ public: static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads); static std::shared_ptr enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr connections) noexcept; - static void drainConnections(IConnections & connections, bool throw_error); + static void drainConnections(IConnections & connections) noexcept; private: explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index a10d70d22e9..a93330dc3a0 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -534,26 +534,14 @@ void RemoteQueryExecutor::finish(std::unique_ptr * read_context) /// Send the request to abort the execution of the request, if not already sent. tryCancel("Cancelling query because enough data has been read", read_context); - - if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000)) + /// Try to drain connections asynchronously. + if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections)) { - auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections); - if (connections_left) - { - /// Drain connections synchronously and suppress errors. - CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections); - ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false); - CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1); - } - } - else - { - /// Drain connections synchronously without suppressing errors. + /// Drain connections synchronously. CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections); - ConnectionCollector::drainConnections(*connections, /* throw_error= */ true); + ConnectionCollector::drainConnections(*conn); CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1); } - finished = true; } diff --git a/tests/queries/0_stateless/02127_connection_drain.reference b/tests/queries/0_stateless/02127_connection_drain.reference deleted file mode 100644 index c31f2f40f6d..00000000000 --- a/tests/queries/0_stateless/02127_connection_drain.reference +++ /dev/null @@ -1,2 +0,0 @@ -OK: sync drain -OK: async drain diff --git a/tests/queries/0_stateless/02127_connection_drain.sh b/tests/queries/0_stateless/02127_connection_drain.sh deleted file mode 100755 index 523b02d9bd5..00000000000 --- a/tests/queries/0_stateless/02127_connection_drain.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env bash -# Tags: no-parallel - -CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CUR_DIR"/../shell_config.sh - -# sync drain -for _ in {1..100}; do - prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'") - curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=-1 format Null" - now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'") - if [[ "$prev" != $(( now-2 )) ]]; then - continue - fi - echo "OK: sync drain" - break -done - -# async drain -for _ in {1..100}; do - prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'") - curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=10 format Null" - now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'") - if [[ "$prev" != $(( now-2 )) ]]; then - continue - fi - echo "OK: async drain" - break -done