Merge pull request #21938 from azat/connection-settings

Apply idle_connnection_timeout/poll_interval after each query
This commit is contained in:
alexey-milovidov 2021-04-19 22:58:21 +03:00 committed by GitHub
commit c87f846816
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -158,6 +158,8 @@ void TCPHandler::runImpl()
}
Settings connection_settings = connection_context->getSettings();
UInt64 idle_connection_timeout = connection_settings.idle_connection_timeout;
UInt64 poll_interval = connection_settings.poll_interval;
sendHello();
@ -168,10 +170,10 @@ void TCPHandler::runImpl()
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
{
Stopwatch idle_time;
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000;
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_ms))
{
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
if (idle_time.elapsedSeconds() > idle_connection_timeout)
{
LOG_TRACE(log, "Closing idle connection");
return;
@ -212,6 +214,15 @@ void TCPHandler::runImpl()
if (!receivePacket())
continue;
/** If Query received, then settings in query_context has been updated
* So, update some other connection settings, for flexibility.
*/
{
const Settings & settings = query_context->getSettingsRef();
idle_connection_timeout = settings.idle_connection_timeout;
poll_interval = settings.poll_interval;
}
/** If part_uuids got received in previous packet, trying to read again.
*/
if (state.empty() && state.part_uuids && !receivePacket())
@ -274,10 +285,10 @@ void TCPHandler::runImpl()
if (context != query_context)
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
size_t poll_interval;
size_t poll_interval_ms;
int receive_timeout;
std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
if (!readDataNext(poll_interval, receive_timeout))
std::tie(poll_interval_ms, receive_timeout) = getReadTimeouts(connection_settings);
if (!readDataNext(poll_interval_ms, receive_timeout))
{
state.block_in.reset();
state.maybe_compressed_in.reset();