Add hard_cancel mod

This commit is contained in:
alexX512 2023-02-14 21:09:15 +00:00
parent 3482960f90
commit b466855e04
5 changed files with 12 additions and 13 deletions

View File

@ -390,7 +390,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
return true;
}
void ExecutingGraph::cancel()
void ExecutingGraph::cancel(bool hard_cancel)
{
std::exception_ptr exception_ptr;
@ -401,7 +401,7 @@ void ExecutingGraph::cancel()
try
{
bool is_source = processor->getInputs().empty();
if (is_source)
if (hard_cancel || is_source)
processor->cancel();
}
catch (...)

View File

@ -137,7 +137,7 @@ public:
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
void cancel();
void cancel(bool hard_cancel = false);
private:
/// Add single edge to edges list. Check processor is known.

View File

@ -67,10 +67,12 @@ const Processors & PipelineExecutor::getProcessors() const
return graph->getProcessors();
}
void PipelineExecutor::cancel()
void PipelineExecutor::cancel(bool hard_cancel)
{
cancelled = true;
graph->cancel();
if (hard_cancel)
finish();
graph->cancel(hard_cancel);
}
void PipelineExecutor::finish()
@ -146,10 +148,8 @@ bool PipelineExecutor::checkTimeLimitSoft()
// We call cancel here so that all processors are notified and tasks waken up
// so that the "break" is faster and doesn't wait for long events
if (!continuing)
{
cancel();
finish();
}
return continuing;
}
@ -229,10 +229,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
break;
if (!context.executeTask())
{
cancel();
finish();
}
if (tasks.isFinished())
break;

View File

@ -48,7 +48,7 @@ public:
const Processors & getProcessors() const;
/// Cancel execution. May be called from another thread.
void cancel();
void cancel(bool hard_cancel = false);
/// Checks the query time limits (cancelled or timeout). Throws on cancellation or when time limit is reached and the query uses "break"
bool checkTimeLimit();

View File

@ -121,6 +121,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (is_execution_finished)
{
LOG_DEBUG(&Poco::Logger::get("PullingAsyncPipelineExecutor::pull"), "execution_finished");
/// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished.
data->is_finished = true;
/// Wait thread and rethrow exception if any.
@ -130,6 +131,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (lazy_format)
{
LOG_DEBUG(&Poco::Logger::get("PullingAsyncPipelineExecutor::pull"), "Get chunk. Ms: {}", milliseconds);
chunk = lazy_format->getChunk(milliseconds);
data->rethrowExceptionIfHas();
return true;
@ -182,7 +184,7 @@ void PullingAsyncPipelineExecutor::cancel()
try
{
if (!data->is_finished && data->executor)
data->executor->cancel();
data->executor->cancel(/*hard_cancel*/ true);
}
catch (...)
{