mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #66905 from ClickHouse/fix_deadlock_on_cancel
Fix possible deadlock on query cancel with parallel replicas
This commit is contained in:
commit
408c5bbbd1
@ -1,48 +1,48 @@
|
||||
#include <Interpreters/AsynchronousInsertQueue.h>
|
||||
#include <Interpreters/Squashing.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
#include <string_view>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/SocketAddress.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/NetException.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/OpenSSLHelpers.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <vector>
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Access/Credentials.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
#include <IO/LimitReadBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Core/ExternalTable.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Formats/NativeReader.h>
|
||||
#include <Formats/NativeWriter.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Interpreters/TablesStatus.h>
|
||||
#include <IO/LimitReadBuffer.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/AsynchronousInsertQueue.h>
|
||||
#include <Interpreters/InternalTextLogsQueue.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Interpreters/Session.h>
|
||||
#include <Interpreters/Squashing.h>
|
||||
#include <Interpreters/TablesStatus.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Server/TCPServer.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
|
||||
#include <Core/ExternalTable.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Access/Credentials.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/SocketAddress.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/NetException.h>
|
||||
#include <Common/OpenSSLHelpers.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
||||
#include <Processors/Executors/PushingPipelineExecutor.h>
|
||||
@ -61,6 +61,8 @@
|
||||
|
||||
#include <Common/config_version.h>
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
using namespace std::literals;
|
||||
using namespace DB;
|
||||
|
||||
@ -1036,6 +1038,17 @@ void TCPHandler::processOrdinaryQuery()
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
|
||||
|
||||
/// The following may happen:
|
||||
/// * current thread is holding the lock
|
||||
/// * because of the exception we unwind the stack and call the destructor of `executor`
|
||||
/// * the destructor calls cancel() and waits for all query threads to finish
|
||||
/// * at the same time one of the query threads is trying to acquire the lock, e.g. inside `merge_tree_read_task_callback`
|
||||
/// * deadlock
|
||||
SCOPE_EXIT({
|
||||
if (out_lock.owns_lock())
|
||||
out_lock.unlock();
|
||||
});
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block, interactive_delay / 1000))
|
||||
{
|
||||
@ -1079,8 +1092,7 @@ void TCPHandler::processOrdinaryQuery()
|
||||
}
|
||||
|
||||
/// This lock wasn't acquired before and we make .lock() call here
|
||||
/// so everything under this line is covered even together
|
||||
/// with sendProgress() out of the scope
|
||||
/// so everything under this line is covered.
|
||||
out_lock.lock();
|
||||
|
||||
/** If data has run out, we will send the profiling data and total values to
|
||||
@ -1107,6 +1119,7 @@ void TCPHandler::processOrdinaryQuery()
|
||||
last_sent_snapshots.clear();
|
||||
}
|
||||
|
||||
out_lock.lock();
|
||||
sendProgress();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user