Merge pull request #45448 from azat/fix-query-hang

Fix possible (likely distributed) query hung
This commit is contained in:
Alexey Milovidov 2023-01-24 13:00:37 +03:00 committed by GitHub
commit 851f6bf910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 9 deletions

View File

@ -392,10 +392,34 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
void ExecutingGraph::cancel()
{
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
processor->cancel();
cancelled = true;
std::exception_ptr exception_ptr;
{
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
{
try
{
processor->cancel();
}
catch (...)
{
if (!exception_ptr)
exception_ptr = std::current_exception();
/// Log any exception since:
/// a) they are pretty rare (the only that I know is from
/// RemoteQueryExecutor)
/// b) there can be exception during query execution, and in this
/// case, this exception can be ignored (not showed to the user).
tryLogCurrentException("ExecutingGraph");
}
}
cancelled = true;
}
if (exception_ptr)
std::rethrow_exception(exception_ptr);
}
}

View File

@ -175,20 +175,35 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
void PullingAsyncPipelineExecutor::cancel()
{
if (!data)
return;
/// Cancel execution if it wasn't finished.
if (data && !data->is_finished && data->executor)
data->executor->cancel();
try
{
if (!data->is_finished && data->executor)
data->executor->cancel();
}
catch (...)
{
/// Store exception only of during query execution there was no
/// exception, since only one exception can be re-thrown.
if (!data->has_exception)
{
data->exception = std::current_exception();
data->has_exception = true;
}
}
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data && data->thread.joinable())
if (data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
if (data)
data->rethrowExceptionIfHas();
data->rethrowExceptionIfHas();
}
Chunk PullingAsyncPipelineExecutor::getTotals()