This commit is contained in:
Nikita Taranov 2024-07-23 00:14:19 +01:00
parent ab43fe979b
commit ea61af961a

View File

@ -1,48 +1,48 @@
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/Squashing.h>
#include <Parsers/ASTInsertQuery.h>
#include <algorithm>
#include <exception>
#include <memory>
#include <mutex>
#include <vector>
#include <string_view>
#include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/setThreadName.h>
#include <Common/OpenSSLHelpers.h>
#include <IO/Progress.h>
#include <vector>
#include <Access/AccessControl.h>
#include <Access/Credentials.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressionFactory.h>
#include <Core/ExternalTable.h>
#include <Core/ServerSettings.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <IO/LimitReadBuffer.h>
#include <IO/Progress.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/Squashing.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
#include <Core/ExternalTable.h>
#include <Core/ServerSettings.h>
#include <Access/AccessControl.h>
#include <Access/Credentials.h>
#include <Compression/CompressionFactory.h>
#include <Common/logger_useful.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/thread_local_rng.h>
#include <fmt/format.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
@ -61,6 +61,8 @@
#include <Common/config_version.h>
#include <fmt/format.h>
using namespace std::literals;
using namespace DB;
@ -1036,6 +1038,17 @@ void TCPHandler::processOrdinaryQuery()
PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
/// The following may happen:
/// * current thread is holding the lock
/// * because of the exception we unwind the stack and call the destructor of `executor`
/// * the destructor calls cancel() and waits for all query threads to finish
/// * at the same time one of the query threads is trying to acquire the lock, e.g. inside `merge_tree_read_task_callback`
/// * deadlock
SCOPE_EXIT({
if (out_lock.owns_lock())
out_lock.unlock();
});
Block block;
while (executor.pull(block, interactive_delay / 1000))
{
@ -1079,8 +1092,7 @@ void TCPHandler::processOrdinaryQuery()
}
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
/// so everything under this line is covered.
out_lock.lock();
/** If data has run out, we will send the profiling data and total values to
@ -1107,6 +1119,7 @@ void TCPHandler::processOrdinaryQuery()
last_sent_snapshots.clear();
}
out_lock.lock();
sendProgress();
}