From b466855e04f78b2b6c678350be9f0c2d6c8b203c Mon Sep 17 00:00:00 2001 From: alexX512 Date: Tue, 14 Feb 2023 21:09:15 +0000 Subject: [PATCH] Add hard_cancel mod --- src/Processors/Executors/ExecutingGraph.cpp | 4 ++-- src/Processors/Executors/ExecutingGraph.h | 2 +- src/Processors/Executors/PipelineExecutor.cpp | 13 +++++-------- src/Processors/Executors/PipelineExecutor.h | 2 +- .../Executors/PullingAsyncPipelineExecutor.cpp | 4 +++- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index f43f0ce8cff..97895c8a39d 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -390,7 +390,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue return true; } -void ExecutingGraph::cancel() +void ExecutingGraph::cancel(bool hard_cancel) { std::exception_ptr exception_ptr; @@ -401,7 +401,7 @@ void ExecutingGraph::cancel() try { bool is_source = processor->getInputs().empty(); - if (is_source) + if (hard_cancel || is_source) processor->cancel(); } catch (...) diff --git a/src/Processors/Executors/ExecutingGraph.h b/src/Processors/Executors/ExecutingGraph.h index 834ef5d4d9d..69e3525d5c7 100644 --- a/src/Processors/Executors/ExecutingGraph.h +++ b/src/Processors/Executors/ExecutingGraph.h @@ -137,7 +137,7 @@ public: /// If processor wants to be expanded, lock will be upgraded to get write access to pipeline. bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue); - void cancel(); + void cancel(bool hard_cancel = false); private: /// Add single edge to edges list. Check processor is known. diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 736098b267e..8d25589315c 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -67,10 +67,12 @@ const Processors & PipelineExecutor::getProcessors() const return graph->getProcessors(); } -void PipelineExecutor::cancel() +void PipelineExecutor::cancel(bool hard_cancel) { cancelled = true; - graph->cancel(); + if (hard_cancel) + finish(); + graph->cancel(hard_cancel); } void PipelineExecutor::finish() @@ -146,10 +148,8 @@ bool PipelineExecutor::checkTimeLimitSoft() // We call cancel here so that all processors are notified and tasks waken up // so that the "break" is faster and doesn't wait for long events if (!continuing) - { cancel(); - finish(); - } + return continuing; } @@ -229,10 +229,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie break; if (!context.executeTask()) - { cancel(); - finish(); - } if (tasks.isFinished()) break; diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index 21bde312cbc..a45a5b6a830 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -48,7 +48,7 @@ public: const Processors & getProcessors() const; /// Cancel execution. May be called from another thread. - void cancel(); + void cancel(bool hard_cancel = false); /// Checks the query time limits (cancelled or timeout). Throws on cancellation or when time limit is reached and the query uses "break" bool checkTimeLimit(); diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 0a7a9025b30..cd23c818887 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -121,6 +121,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds) if (is_execution_finished) { + LOG_DEBUG(&Poco::Logger::get("PullingAsyncPipelineExecutor::pull"), "execution_finished"); /// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished. data->is_finished = true; /// Wait thread and rethrow exception if any. @@ -130,6 +131,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds) if (lazy_format) { + LOG_DEBUG(&Poco::Logger::get("PullingAsyncPipelineExecutor::pull"), "Get chunk. Ms: {}", milliseconds); chunk = lazy_format->getChunk(milliseconds); data->rethrowExceptionIfHas(); return true; @@ -182,7 +184,7 @@ void PullingAsyncPipelineExecutor::cancel() try { if (!data->is_finished && data->executor) - data->executor->cancel(); + data->executor->cancel(/*hard_cancel*/ true); } catch (...) {