Try fix hung in void PullingAsyncPipelineExecutor::cancel()

This commit is contained in:
Nikolai Kochetov 2021-02-24 17:10:35 +03:00
parent e5cef576e5
commit d966725f33
2 changed files with 12 additions and 11 deletions

View File

@ -14,6 +14,7 @@ struct PullingAsyncPipelineExecutor::Data
{ {
PipelineExecutorPtr executor; PipelineExecutorPtr executor;
std::exception_ptr exception; std::exception_ptr exception;
LazyOutputFormat * lazy_format = nullptr;
std::atomic_bool is_finished = false; std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false; std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread; ThreadFromGlobalPool thread;
@ -82,6 +83,10 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
{ {
data.exception = std::current_exception(); data.exception = std::current_exception();
data.has_exception = true; data.has_exception = true;
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
if (data.lazy_format)
data.lazy_format->cancel();
} }
data.is_finished = true; data.is_finished = true;
@ -95,6 +100,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
{ {
data = std::make_unique<Data>(); data = std::make_unique<Data>();
data->executor = pipeline.execute(); data->executor = pipeline.execute();
data->lazy_format = lazy_format.get();
auto func = [&, thread_group = CurrentThread::getGroup()]() auto func = [&, thread_group = CurrentThread::getGroup()]()
{ {
@ -105,14 +111,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
} }
if (data->has_exception) if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
if (lazy_format)
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception)); std::rethrow_exception(std::move(data->exception));
}
bool is_execution_finished = lazy_format ? lazy_format->isFinished() bool is_execution_finished = lazy_format ? lazy_format->isFinished()
: data->is_finished.load(); : data->is_finished.load();
@ -172,14 +171,14 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
void PullingAsyncPipelineExecutor::cancel() void PullingAsyncPipelineExecutor::cancel()
{ {
/// Cancel execution if it wasn't finished.
if (data && !data->is_finished && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung. /// Finish lazy format. Otherwise thread.join() may hung.
if (lazy_format && !lazy_format->isFinished()) if (lazy_format && !lazy_format->isFinished())
lazy_format->finish(); lazy_format->finish();
/// Cancel execution if it wasn't finished.
if (data && !data->is_finished && data->executor)
data->executor->cancel();
/// Join thread here to wait for possible exception. /// Join thread here to wait for possible exception.
if (data && data->thread.joinable()) if (data && data->thread.joinable())
data->thread.join(); data->thread.join();

View File

@ -36,6 +36,8 @@ public:
queue.clear(); queue.clear();
} }
void onCancel() override { finalize(); }
protected: protected:
void consume(Chunk chunk) override void consume(Chunk chunk) override
{ {