Try fix data race in TCPHandler

This commit is contained in:
kssenii 2024-06-26 19:49:14 +02:00
parent c8a2490be7
commit 2e7c23b82c
2 changed files with 22 additions and 12 deletions

View File

@ -387,7 +387,7 @@ void TCPHandler::runImpl()
query_scope.emplace(query_context, /* fatal_error_callback */ [this] query_scope.emplace(query_context, /* fatal_error_callback */ [this]
{ {
std::lock_guard lock(fatal_error_mutex); std::lock_guard lock(out_mutex);
sendLogs(); sendLogs();
}); });
@ -475,7 +475,7 @@ void TCPHandler::runImpl()
Stopwatch watch; Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent); CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex); std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return {}; return {};
@ -491,7 +491,7 @@ void TCPHandler::runImpl()
{ {
Stopwatch watch; Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent); CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
std::lock_guard lock(task_callback_mutex); std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return; return;
@ -505,7 +505,7 @@ void TCPHandler::runImpl()
{ {
Stopwatch watch; Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent); CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex); std::scoped_lock lock(out_mutex, task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return std::nullopt; return std::nullopt;
@ -553,7 +553,7 @@ void TCPHandler::runImpl()
{ {
auto callback = [this]() auto callback = [this]()
{ {
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex); std::scoped_lock lock(out_mutex, task_callback_mutex);
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED) if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
return true; return true;
@ -572,7 +572,7 @@ void TCPHandler::runImpl()
finish_or_cancel(); finish_or_cancel();
std::lock_guard lock(task_callback_mutex); std::lock_guard lock(out_mutex);
/// Send final progress after calling onFinish(), since it will update the progress. /// Send final progress after calling onFinish(), since it will update the progress.
/// ///
@ -595,7 +595,7 @@ void TCPHandler::runImpl()
break; break;
{ {
std::lock_guard lock(task_callback_mutex); std::lock_guard lock(out_mutex);
sendLogs(); sendLogs();
sendEndOfStream(); sendEndOfStream();
} }
@ -1014,7 +1014,7 @@ void TCPHandler::processOrdinaryQuery()
if (query_context->getSettingsRef().allow_experimental_query_deduplication) if (query_context->getSettingsRef().allow_experimental_query_deduplication)
{ {
std::lock_guard lock(task_callback_mutex); std::lock_guard lock(out_mutex);
sendPartUUIDs(); sendPartUUIDs();
} }
@ -1024,13 +1024,13 @@ void TCPHandler::processOrdinaryQuery()
if (header) if (header)
{ {
std::lock_guard lock(task_callback_mutex); std::lock_guard lock(out_mutex);
sendData(header); sendData(header);
} }
} }
/// Defer locking to cover a part of the scope below and everything after it /// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock); std::unique_lock out_lock(out_mutex, std::defer_lock);
{ {
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
@ -1056,6 +1056,9 @@ void TCPHandler::processOrdinaryQuery()
executor.cancelReading(); executor.cancelReading();
} }
lock.unlock();
out_lock.lock();
if (after_send_progress.elapsed() / 1000 >= interactive_delay) if (after_send_progress.elapsed() / 1000 >= interactive_delay)
{ {
/// Some time passed and there is a progress. /// Some time passed and there is a progress.
@ -1071,12 +1074,14 @@ void TCPHandler::processOrdinaryQuery()
if (!state.io.null_format) if (!state.io.null_format)
sendData(block); sendData(block);
} }
out_lock.unlock();
} }
/// This lock wasn't acquired before and we make .lock() call here /// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together /// so everything under this line is covered even together
/// with sendProgress() out of the scope /// with sendProgress() out of the scope
progress_lock.lock(); out_lock.lock();
/** If data has run out, we will send the profiling data and total values to /** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use * the last zero block to be able to use

View File

@ -225,8 +225,13 @@ private:
std::optional<UInt64> nonce; std::optional<UInt64> nonce;
String cluster; String cluster;
/// `out_mutex` protects `out` (WriteBuffer).
/// So it is used for method sendData(), sendProgress(), sendLogs(), etc.
std::mutex out_mutex;
/// `task_callback_mutex` protects tasks callbacks.
/// Inside these callbacks we might also change cancellation status,
/// so it also protects cancellation status checks.
std::mutex task_callback_mutex; std::mutex task_callback_mutex;
std::mutex fatal_error_mutex;
/// At the moment, only one ongoing query in the connection is supported at a time. /// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state; QueryState state;