One more attempt to fix race in TCPHandler (#45240)

This commit is contained in:
Nikita Mikhaylov 2023-01-17 16:17:14 +01:00 committed by GitHub
parent 582aa8b770
commit 0fc755806e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -402,14 +402,10 @@ void TCPHandler::runImpl()
{ {
auto callback = [this]() auto callback = [this]()
{ {
{ std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
std::lock_guard task_callback_lock(task_callback_mutex);
if (isQueryCancelled()) if (isQueryCancelled())
return true; return true;
}
std::lock_guard lock(fatal_error_mutex);
sendProgress(); sendProgress();
sendSelectProfileEvents(); sendSelectProfileEvents();
@ -424,6 +420,9 @@ void TCPHandler::runImpl()
} }
state.io.onFinish(); state.io.onFinish();
std::lock_guard lock(task_callback_mutex);
/// Send final progress after calling onFinish(), since it will update the progress. /// Send final progress after calling onFinish(), since it will update the progress.
/// ///
/// NOTE: we cannot send Progress for regular INSERT (with VALUES) /// NOTE: we cannot send Progress for regular INSERT (with VALUES)
@ -446,8 +445,11 @@ void TCPHandler::runImpl()
if (state.is_connection_closed) if (state.is_connection_closed)
break; break;
sendLogs(); {
sendEndOfStream(); std::lock_guard lock(task_callback_mutex);
sendLogs();
sendEndOfStream();
}
/// QueryState should be cleared before QueryScope, since otherwise /// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations. /// the MemoryTracker will be wrong for possible deallocations.
@ -760,6 +762,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
} }
} }
/// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
{ {
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
@ -796,6 +801,11 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
} }
} }
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
progress_lock.lock();
/** If data has run out, we will send the profiling data and total values to /** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use * the last zero block to be able to use
* this information in the suffix output of stream. * this information in the suffix output of stream.