Merge pull request #29783 from azat/tcphandler-progress-race

Fix data-race between fatal error handler and progress packets
This commit is contained in:
alexey-milovidov 2021-10-07 01:47:29 +03:00 committed by GitHub
commit 16ad5953d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -218,6 +218,8 @@ void TCPHandler::runImpl()
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), receive_timeout, send_timeout);
std::mutex fatal_error_mutex;
/// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
@ -226,7 +228,11 @@ void TCPHandler::runImpl()
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]{ sendLogs(); });
CurrentThread::setFatalErrorCallback([this, &fatal_error_mutex]
{
std::lock_guard lock(fatal_error_mutex);
sendLogs();
});
}
query_context->setExternalTablesInitializer([this] (ContextPtr context)
@ -310,8 +316,10 @@ void TCPHandler::runImpl()
/// Should not check for cancel in case of input.
if (!state.need_receive_data_for_input)
{
auto callback = [this]()
auto callback = [this, &fatal_error_mutex]()
{
std::lock_guard lock(fatal_error_mutex);
if (isQueryCancelled())
return true;