From 0566f72d3659db4ca814a85306ddbc5829a62f00 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 19 Jan 2023 19:57:16 +0100 Subject: [PATCH 1/3] Cleanup PullingAsyncPipelineExecutor::cancel() Signed-off-by: Azat Khuzhin --- .../Executors/PullingAsyncPipelineExecutor.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 5799fbcc5d8..3dc99397bd1 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -175,20 +175,22 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds) void PullingAsyncPipelineExecutor::cancel() { + if (!data) + return; + /// Cancel execution if it wasn't finished. - if (data && !data->is_finished && data->executor) + if (!data->is_finished && data->executor) data->executor->cancel(); /// The following code is needed to rethrow exception from PipelineExecutor. /// It could have been thrown from pull(), but we will not likely call it again. /// Join thread here to wait for possible exception. - if (data && data->thread.joinable()) + if (data->thread.joinable()) data->thread.join(); /// Rethrow exception to not swallow it in destructor. - if (data) - data->rethrowExceptionIfHas(); + data->rethrowExceptionIfHas(); } Chunk PullingAsyncPipelineExecutor::getTotals() From e2fcf0f0727013068a95cca30c8382580ace6033 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 19 Jan 2023 20:03:14 +0100 Subject: [PATCH 2/3] Catch exception on query cancellation Since we still want to join the thread, yes it will be done in dtor, but this looks better. Signed-off-by: Azat Khuzhin --- .../Executors/PullingAsyncPipelineExecutor.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 3dc99397bd1..fbbf8c119ce 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -179,8 +179,21 @@ void PullingAsyncPipelineExecutor::cancel() return; /// Cancel execution if it wasn't finished. - if (!data->is_finished && data->executor) - data->executor->cancel(); + try + { + if (!data->is_finished && data->executor) + data->executor->cancel(); + } + catch (...) + { + /// Store exception only of during query execution there was no + /// exception, since only one exception can be re-thrown. + if (!data->has_exception) + { + data->exception = std::current_exception(); + data->has_exception = true; + } + } /// The following code is needed to rethrow exception from PipelineExecutor. /// It could have been thrown from pull(), but we will not likely call it again. From a64f6b5f3eafefa9d4fc9d314b19e6c0086e37fa Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 19 Jan 2023 20:02:02 +0100 Subject: [PATCH 3/3] Fix possible (likely distributed) query hung MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recently I saw the following, the client executed long distributed query and terminated the connection, and in this case query cancellation will be done from PullingAsyncPipelineExecutor dtor, but during cancellation one of nodes sent ECONNRESET, and this leads to an exception from PullingAsyncPipelineExecutor::cancel(), and this leads to a deadlock when multiple threads waits each others, because cancel() for LazyOutputFormat wasn't called. Here is as relevant portion of logs: 2023.01.04 08:26:09.236208 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} executeQuery: (from 10.61.13.253:44266, user: default) TooLongDistributedQueryToPost ... 2023.01.04 08:26:09.262424 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} MergeTreeInOrderSelectProcessor: Reading 1 ranges in order from part 9_330_538_18, approx. 61440 rows starting from 0 2023.01.04 08:26:09.266399 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} Connection (s4.ch:9000): Connecting. Database: (not specified). User: default 2023.01.04 08:26:09.266849 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} Connection (s4.ch:9000): Connected to ClickHouse server version 22.10.1. 2023.01.04 08:26:09.267165 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} Connection (s4.ch:9000): Sent data for 2 scalars, total 2 rows in 3.1587e-05 sec., 62635 rows/sec., 68.00 B (2.03 MiB/sec.), compressed 0.4594594594594595 times to 148.00 B (4.41 MiB/sec.) 2023.01.04 08:39:13.047170 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} PullingAsyncPipelineExecutor: Code: 210. DB::NetException: Connection reset by peer, while writing to socket (10.7.142.115:9000). (NETWORK_ERROR), Stack trace (when copying this message, always include the lines below): 0. ./.build/./contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x1818234c in /usr/lib/debug/usr/bin/clickhouse.debug 1. ./.build/./src/Common/Exception.cpp:69: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0x1004fbda in /usr/lib/debug/usr/bin/clickhouse.debug 2. ./.build/./src/Common/NetException.h:12: DB::WriteBufferFromPocoSocket::nextImpl() @ 0x14e352f3 in /usr/lib/debug/usr/bin/clickhouse.debug 3. ./.build/./src/IO/BufferBase.h:39: DB::Connection::sendCancel() @ 0x15c21e6b in /usr/lib/debug/usr/bin/clickhouse.debug 4. ./.build/./src/Client/MultiplexedConnections.cpp:0: DB::MultiplexedConnections::sendCancel() @ 0x15c4d5b7 in /usr/lib/debug/usr/bin/clickhouse.debug 5. ./.build/./src/QueryPipeline/RemoteQueryExecutor.cpp:627: DB::RemoteQueryExecutor::tryCancel(char const*, std::__1::unique_ptr >*) @ 0x14446c09 in /usr/lib/debug/usr/bin/clickhouse.debug 6. ./.build/./contrib/libcxx/include/__iterator/wrap_iter.h:100: DB::ExecutingGraph::cancel() @ 0x15d2c0de in /usr/lib/debug/usr/bin/clickhouse.debug 7. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:300: DB::PullingAsyncPipelineExecutor::cancel() @ 0x15d32055 in /usr/lib/debug/usr/bin/clickhouse.debug 8. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:312: DB::PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor() @ 0x15d31f4f in /usr/lib/debug/usr/bin/clickhouse.debug 9. ./.build/./src/Server/TCPHandler.cpp:0: DB::TCPHandler::processOrdinaryQueryWithProcessors() @ 0x15cde919 in /usr/lib/debug/usr/bin/clickhouse.debug 10. ./.build/./src/Server/TCPHandler.cpp:0: DB::TCPHandler::runImpl() @ 0x15cd8554 in /usr/lib/debug/usr/bin/clickhouse.debug 11. ./.build/./src/Server/TCPHandler.cpp:1904: DB::TCPHandler::run() @ 0x15ce6479 in /usr/lib/debug/usr/bin/clickhouse.debug 12. ./.build/./contrib/poco/Net/src/TCPServerConnection.cpp:57: Poco::Net::TCPServerConnection::start() @ 0x18074f07 in /usr/lib/debug/usr/bin/clickhouse.debug 13. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:54: Poco::Net::TCPServerDispatcher::run() @ 0x180753ed in /usr/lib/debug/usr/bin/clickhouse.debug 14. ./.build/./contrib/poco/Foundation/src/ThreadPool.cpp:213: Poco::PooledThread::run() @ 0x181e3807 in /usr/lib/debug/usr/bin/clickhouse.debug 15. ./.build/./contrib/poco/Foundation/include/Poco/SharedPtr.h:156: Poco::ThreadImpl::runnableEntry(void*) @ 0x181e1483 in /usr/lib/debug/usr/bin/clickhouse.debug 16. ? @ 0x7ffff7e55fd4 in ? 17. ? @ 0x7ffff7ed666c in ? (version 22.10.1.1) And here is the state of the threads:
system.stack_trace ```sql SELECT arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') AS sym FROM system.stack_trace WHERE query_id = 'f2ed6149-146d-4a3d-874a-b0b751c7b567' SETTINGS allow_introspection_functions=1 Row 1: ────── sym: pthread_cond_wait std::__1::condition_variable::wait(std::__1::unique_lock&) bool ConcurrentBoundedQueue::emplaceImpl(std::__1::optional, DB::Chunk&&) DB::IOutputFormat::work() DB::ExecutionThreadContext::executeTask() DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic*) Row 2: ────── sym: pthread_cond_wait Poco::EventImpl::waitImpl() DB::PipelineExecutor::joinThreads() DB::PipelineExecutor::executeImpl(unsigned long) DB::PipelineExecutor::execute(unsigned long) Row 3: ────── sym: pthread_cond_wait Poco::EventImpl::waitImpl() DB::PullingAsyncPipelineExecutor::Data::~Data() DB::PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor() DB::TCPHandler::processOrdinaryQueryWithProcessors() DB::TCPHandler::runImpl() DB::TCPHandler::run() Poco::Net::TCPServerConnection::start() Poco::Net::TCPServerDispatcher::run() Poco::PooledThread::run() Poco::ThreadImpl::runnableEntry(void*) ```
Signed-off-by: Azat Khuzhin --- src/Processors/Executors/ExecutingGraph.cpp | 32 ++++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index cd94ca7ceae..f84efabdee1 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -392,10 +392,34 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue void ExecutingGraph::cancel() { - std::lock_guard guard(processors_mutex); - for (auto & processor : *processors) - processor->cancel(); - cancelled = true; + std::exception_ptr exception_ptr; + + { + std::lock_guard guard(processors_mutex); + for (auto & processor : *processors) + { + try + { + processor->cancel(); + } + catch (...) + { + if (!exception_ptr) + exception_ptr = std::current_exception(); + + /// Log any exception since: + /// a) they are pretty rare (the only that I know is from + /// RemoteQueryExecutor) + /// b) there can be exception during query execution, and in this + /// case, this exception can be ignored (not showed to the user). + tryLogCurrentException("ExecutingGraph"); + } + } + cancelled = true; + } + + if (exception_ptr) + std::rethrow_exception(exception_ptr); } }