mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #65744 from ClickHouse/try-fix-race-in-tcp-handler
Try fix data race in TCPHandler between `processOrdinaryQuery -> sendProgress` and `fatal_error_callback -> sendLogs`
This commit is contained in:
commit
7892e77d9d
@ -387,7 +387,7 @@ void TCPHandler::runImpl()
|
||||
|
||||
query_scope.emplace(query_context, /* fatal_error_callback */ [this]
|
||||
{
|
||||
std::lock_guard lock(fatal_error_mutex);
|
||||
std::lock_guard lock(out_mutex);
|
||||
sendLogs();
|
||||
});
|
||||
|
||||
@ -475,7 +475,7 @@ void TCPHandler::runImpl()
|
||||
Stopwatch watch;
|
||||
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent);
|
||||
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::scoped_lock lock(out_mutex, task_callback_mutex);
|
||||
|
||||
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
|
||||
return {};
|
||||
@ -491,7 +491,7 @@ void TCPHandler::runImpl()
|
||||
{
|
||||
Stopwatch watch;
|
||||
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::scoped_lock lock(out_mutex, task_callback_mutex);
|
||||
|
||||
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
|
||||
return;
|
||||
@ -505,7 +505,7 @@ void TCPHandler::runImpl()
|
||||
{
|
||||
Stopwatch watch;
|
||||
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::scoped_lock lock(out_mutex, task_callback_mutex);
|
||||
|
||||
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
|
||||
return std::nullopt;
|
||||
@ -553,7 +553,7 @@ void TCPHandler::runImpl()
|
||||
{
|
||||
auto callback = [this]()
|
||||
{
|
||||
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
|
||||
std::scoped_lock lock(out_mutex, task_callback_mutex);
|
||||
|
||||
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
|
||||
return true;
|
||||
@ -572,7 +572,7 @@ void TCPHandler::runImpl()
|
||||
|
||||
finish_or_cancel();
|
||||
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::lock_guard lock(out_mutex);
|
||||
|
||||
/// Send final progress after calling onFinish(), since it will update the progress.
|
||||
///
|
||||
@ -595,7 +595,7 @@ void TCPHandler::runImpl()
|
||||
break;
|
||||
|
||||
{
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::lock_guard lock(out_mutex);
|
||||
sendLogs();
|
||||
sendEndOfStream();
|
||||
}
|
||||
@ -1014,7 +1014,7 @@ void TCPHandler::processOrdinaryQuery()
|
||||
|
||||
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
|
||||
{
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::lock_guard lock(out_mutex);
|
||||
sendPartUUIDs();
|
||||
}
|
||||
|
||||
@ -1024,13 +1024,13 @@ void TCPHandler::processOrdinaryQuery()
|
||||
|
||||
if (header)
|
||||
{
|
||||
std::lock_guard lock(task_callback_mutex);
|
||||
std::lock_guard lock(out_mutex);
|
||||
sendData(header);
|
||||
}
|
||||
}
|
||||
|
||||
/// Defer locking to cover a part of the scope below and everything after it
|
||||
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
|
||||
std::unique_lock out_lock(out_mutex, std::defer_lock);
|
||||
|
||||
{
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
@ -1056,6 +1056,9 @@ void TCPHandler::processOrdinaryQuery()
|
||||
executor.cancelReading();
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
out_lock.lock();
|
||||
|
||||
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
|
||||
{
|
||||
/// Some time passed and there is a progress.
|
||||
@ -1071,12 +1074,14 @@ void TCPHandler::processOrdinaryQuery()
|
||||
if (!state.io.null_format)
|
||||
sendData(block);
|
||||
}
|
||||
|
||||
out_lock.unlock();
|
||||
}
|
||||
|
||||
/// This lock wasn't acquired before and we make .lock() call here
|
||||
/// so everything under this line is covered even together
|
||||
/// with sendProgress() out of the scope
|
||||
progress_lock.lock();
|
||||
out_lock.lock();
|
||||
|
||||
/** If data has run out, we will send the profiling data and total values to
|
||||
* the last zero block to be able to use
|
||||
|
@ -226,8 +226,13 @@ private:
|
||||
std::optional<UInt64> nonce;
|
||||
String cluster;
|
||||
|
||||
/// `out_mutex` protects `out` (WriteBuffer).
|
||||
/// So it is used for method sendData(), sendProgress(), sendLogs(), etc.
|
||||
std::mutex out_mutex;
|
||||
/// `task_callback_mutex` protects tasks callbacks.
|
||||
/// Inside these callbacks we might also change cancellation status,
|
||||
/// so it also protects cancellation status checks.
|
||||
std::mutex task_callback_mutex;
|
||||
std::mutex fatal_error_mutex;
|
||||
|
||||
/// At the moment, only one ongoing query in the connection is supported at a time.
|
||||
QueryState state;
|
||||
|
Loading…
Reference in New Issue
Block a user