Revert "Merge pull request #31965 from azat/connection-drain-pull"

This reverts commit 6cb54b4092, reversing
changes made to 42d938ffa0.
This commit is contained in:
Alexander Tokmakov 2023-03-11 17:24:38 +01:00
parent f085f96de5
commit bd124a24ea
9 changed files with 10 additions and 70 deletions

View File

@ -348,10 +348,6 @@
<background_distributed_schedule_pool_size>16</background_distributed_schedule_pool_size>
-->
<!-- Number of workers to recycle connections in background (see also drain_timeout).
If the pool is full, connection will be drained synchronously. -->
<!-- <max_threads_for_connection_collector>10</max_threads_for_connection_collector> -->
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>

View File

@ -25,12 +25,7 @@ struct PocoSocketWrapper : public Poco::Net::SocketImpl
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string & fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
{
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Read timeout ({} ms) while draining from {}",
drain_timeout.totalMilliseconds(),
fd_description);
}
throw Exception(ErrorCodes::SOCKET_TIMEOUT, "Read timeout while draining from {}", fd_description);
}
}

View File

@ -390,7 +390,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto timeout = is_draining ? drain_timeout : receive_timeout;
auto timeout = receive_timeout;
int n = 0;
/// EINTR loop

View File

@ -57,7 +57,6 @@ class IColumn;
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -759,7 +758,7 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -47,7 +47,7 @@ struct AsyncDrainTask
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false);
ConnectionCollector::drainConnections(*shared_connections);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
@ -72,7 +72,7 @@ std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error)
void ConnectionCollector::drainConnections(IConnections & connections) noexcept
{
bool is_drained = false;
try
@ -91,9 +91,6 @@ void ConnectionCollector::drainConnections(IConnections & connections, bool thro
break;
default:
/// Connection should be closed in case of unexpected packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)",
@ -115,9 +112,6 @@ void ConnectionCollector::drainConnections(IConnections & connections, bool thro
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
if (throw_error)
throw;
}
}

View File

@ -17,7 +17,7 @@ public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections, bool throw_error);
static void drainConnections(IConnections & connections) noexcept;
private:
explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads);

View File

@ -534,26 +534,14 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000))
/// Try to drain connections asynchronously.
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections))
{
auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections);
if (connections_left)
{
/// Drain connections synchronously and suppress errors.
/// Drain connections synchronously.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false);
ConnectionCollector::drainConnections(*conn);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
}
else
{
/// Drain connections synchronously without suppressing errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections, /* throw_error= */ true);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
finished = true;
}

View File

@ -1,2 +0,0 @@
OK: sync drain
OK: async drain

View File

@ -1,30 +0,0 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# sync drain
for _ in {1..100}; do
prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'")
curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=-1 format Null"
now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'")
if [[ "$prev" != $(( now-2 )) ]]; then
continue
fi
echo "OK: sync drain"
break
done
# async drain
for _ in {1..100}; do
prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'")
curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=10 format Null"
now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'")
if [[ "$prev" != $(( now-2 )) ]]; then
continue
fi
echo "OK: async drain"
break
done