Merge pull request #67341 from ClickHouse/fix-race-tcphandler

Fix: data race in TCPHandler on socket timeouts settings
This commit is contained in:
Yakov Olkhovskiy 2024-08-08 02:50:24 +00:00 committed by GitHub
commit 3319a5e6f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -365,9 +365,6 @@ void TCPHandler::runImpl()
try try
{ {
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning. /** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`. * There may come settings for a separate query that modify `query_context`.
* It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice. * It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice.
@ -397,7 +394,8 @@ void TCPHandler::runImpl()
/// So it's better to update the connection settings for flexibility. /// So it's better to update the connection settings for flexibility.
extractConnectionSettingsFromContext(query_context); extractConnectionSettingsFromContext(query_context);
/// Sync timeouts on client and server during current query to avoid dangling queries on server /// Sync timeouts on client and server during current query to avoid dangling queries on server.
/// It should be reset at the end of query.
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), send_timeout, receive_timeout); state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), send_timeout, receive_timeout);
/// Should we send internal logs to client? /// Should we send internal logs to client?
@ -607,6 +605,7 @@ void TCPHandler::runImpl()
/// QueryState should be cleared before QueryScope, since otherwise /// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations. /// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation) /// (i.e. deallocations from the Aggregator with two-level aggregation)
/// Also it resets socket's timeouts.
state.reset(); state.reset();
last_sent_snapshots = ProfileEvents::ThreadIdToCountersSnapshot{}; last_sent_snapshots = ProfileEvents::ThreadIdToCountersSnapshot{};
query_scope.reset(); query_scope.reset();
@ -632,6 +631,9 @@ void TCPHandler::runImpl()
state.io.onException(); state.io.onException();
exception.reset(e.clone()); exception.reset(e.clone());
/// In case of exception state was not reset, so socket's timouts must be reset explicitly
state.timeout_setter.reset();
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw; throw;
@ -690,6 +692,9 @@ void TCPHandler::runImpl()
exception = std::make_unique<DB::Exception>(Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception")); exception = std::make_unique<DB::Exception>(Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception"));
} }
/// In case of exception state was not reset, so socket's timouts must be reset explicitly
state.timeout_setter.reset();
try try
{ {
if (exception) if (exception)