Merge pull request #68291 from ClickHouse/add-cancelletion-reason-to-pipeline-executor

Add execution status to PipelineExecutor.
This commit is contained in:
Nikolai Kochetov 2024-08-15 14:31:26 +00:00 committed by GitHub
commit fc5a19949f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 91 additions and 30 deletions

View File

@ -96,7 +96,7 @@ bool ExecutingGraph::addEdges(uint64_t node)
return was_edge_added;
}
bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
ExecutingGraph::UpdateNodeStatus ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
auto & cur_node = *nodes[pid];
Processors new_processors;
@ -108,7 +108,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
catch (...)
{
cur_node.exception = std::current_exception();
return false;
return UpdateNodeStatus::Exception;
}
{
@ -118,7 +118,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
for (auto & processor : new_processors)
processor->cancel();
return false;
return UpdateNodeStatus::Cancelled;
}
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
@ -178,7 +178,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
}
}
return true;
return UpdateNodeStatus::Done;
}
void ExecutingGraph::initializeExecution(Queue & queue)
@ -213,7 +213,7 @@ void ExecutingGraph::initializeExecution(Queue & queue)
}
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
ExecutingGraph::UpdateNodeStatus ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
{
std::stack<Edge *> updated_edges;
std::stack<uint64_t> updated_processors;
@ -309,7 +309,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
catch (...)
{
node.exception = std::current_exception();
return false;
return UpdateNodeStatus::Exception;
}
#ifndef NDEBUG
@ -386,8 +386,9 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
read_lock.unlock();
{
std::unique_lock lock(nodes_mutex);
if (!expandPipeline(updated_processors, pid))
return false;
auto status = expandPipeline(updated_processors, pid);
if (status != UpdateNodeStatus::Done)
return status;
}
read_lock.lock();
@ -397,7 +398,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
}
}
return true;
return UpdateNodeStatus::Done;
}
void ExecutingGraph::cancel(bool cancel_all_processors)

View File

@ -138,10 +138,17 @@ public:
/// Traverse graph the first time to update all the childless nodes.
void initializeExecution(Queue & queue);
enum class UpdateNodeStatus
{
Done,
Exception,
Cancelled,
};
/// Update processor with pid number (call IProcessor::prepare).
/// Check parents and children of current processor and push them to stacks if they also need to be updated.
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
UpdateNodeStatus updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
void cancel(bool cancel_all_processors = true);
@ -155,7 +162,7 @@ private:
/// Update graph after processor (pid) returned ExpandPipeline status.
/// All new nodes and nodes with updated ports are pushed into stack.
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
UpdateNodeStatus expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
std::shared_ptr<Processors> processors;
std::vector<bool> source_processors;

View File

@ -77,9 +77,9 @@ const Processors & PipelineExecutor::getProcessors() const
return graph->getProcessors();
}
void PipelineExecutor::cancel()
void PipelineExecutor::cancel(ExecutionStatus reason)
{
cancelled = true;
tryUpdateExecutionStatus(ExecutionStatus::Executing, reason);
finish();
graph->cancel();
}
@ -98,6 +98,11 @@ void PipelineExecutor::finish()
tasks.finish();
}
bool PipelineExecutor::tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired)
{
return execution_status.compare_exchange_strong(expected, desired);
}
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
{
checkTimeLimit();
@ -120,7 +125,7 @@ void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
}
catch (...)
{
span.addAttribute(ExecutionStatus::fromCurrentException());
span.addAttribute(DB::ExecutionStatus::fromCurrentException());
#ifndef NDEBUG
LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline());
@ -169,7 +174,7 @@ bool PipelineExecutor::checkTimeLimitSoft()
// We call cancel here so that all processors are notified and tasks waken up
// so that the "break" is faster and doesn't wait for long events
if (!continuing)
cancel();
cancel(ExecutionStatus::CancelledByTimeout);
return continuing;
}
@ -195,7 +200,8 @@ void PipelineExecutor::finalizeExecution()
{
checkTimeLimit();
if (cancelled)
auto status = execution_status.load();
if (status == ExecutionStatus::CancelledByTimeout || status == ExecutionStatus::CancelledByUser)
return;
bool all_processors_finished = true;
@ -271,7 +277,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
break;
if (!context.executeTask())
cancel();
cancel(ExecutionStatus::Exception);
if (tasks.isFinished())
break;
@ -289,11 +295,13 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
Queue async_queue;
/// Prepare processor after execution.
if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
cancel();
auto status = graph->updateNode(context.getProcessorID(), queue, async_queue);
if (status == ExecutingGraph::UpdateNodeStatus::Exception)
cancel(ExecutionStatus::Exception);
/// Push other tasks to global queue.
tasks.pushTasks(queue, async_queue, context);
if (status == ExecutingGraph::UpdateNodeStatus::Done)
tasks.pushTasks(queue, async_queue, context);
}
#ifndef NDEBUG
@ -309,7 +317,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
{
/// spawnThreads can throw an exception, for example CANNOT_SCHEDULE_TASK.
/// We should cancel execution properly before rethrow.
cancel();
cancel(ExecutionStatus::Exception);
throw;
}
@ -328,6 +336,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_control)
{
is_execution_initialized = true;
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
size_t use_threads = num_threads;
@ -393,7 +402,7 @@ void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control)
{
/// If finished_flag is not set, there was an exception.
/// Cancel execution in this case.
cancel();
cancel(ExecutionStatus::Exception);
if (pool)
pool->wait();
}

View File

@ -48,8 +48,20 @@ public:
const Processors & getProcessors() const;
enum class ExecutionStatus
{
NotStarted,
Executing,
Finished,
Exception,
CancelledByUser,
CancelledByTimeout,
};
/// Cancel execution. May be called from another thread.
void cancel();
void cancel() { cancel(ExecutionStatus::CancelledByUser); }
ExecutionStatus getExecutionStatus() const { return execution_status.load(); }
/// Cancel processors which only read data from source. May be called from another thread.
void cancelReading();
@ -81,7 +93,7 @@ private:
/// system.opentelemetry_span_log
bool trace_processors = false;
std::atomic_bool cancelled = false;
std::atomic<ExecutionStatus> execution_status = ExecutionStatus::NotStarted;
std::atomic_bool cancelled_reading = false;
LoggerPtr log = getLogger("PipelineExecutor");
@ -105,6 +117,10 @@ private:
void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr);
void executeSingleThread(size_t thread_num);
void finish();
void cancel(ExecutionStatus reason);
/// If execution_status == from, change it to desired.
bool tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired);
String dumpPipeline() const;
};

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
class PushingAsyncSource : public ISource
@ -176,6 +177,16 @@ void PushingAsyncPipelineExecutor::start()
data->thread = ThreadFromGlobalPool(std::move(func));
}
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
{
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
}
void PushingAsyncPipelineExecutor::push(Chunk chunk)
{
if (!started)
@ -185,8 +196,7 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk)
data->rethrowExceptionIfHas();
if (!is_pushed)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted");
throwOnExecutionStatus(data->executor->getExecutionStatus());
}
void PushingAsyncPipelineExecutor::push(Block block)

View File

@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
class PushingSource : public ISource
@ -80,6 +81,15 @@ const Block & PushingPipelineExecutor::getHeader() const
return pushing_source->getPort().getHeader();
}
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
{
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
}
void PushingPipelineExecutor::start()
{
@ -91,8 +101,7 @@ void PushingPipelineExecutor::start()
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
throwOnExecutionStatus(executor->getExecutionStatus());
}
void PushingPipelineExecutor::push(Chunk chunk)
@ -103,8 +112,7 @@ void PushingPipelineExecutor::push(Chunk chunk)
pushing_source->setData(std::move(chunk));
if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
throwOnExecutionStatus(executor->getExecutionStatus());
}
void PushingPipelineExecutor::push(Block block)

View File

@ -0,0 +1,2 @@
QUERY_WAS_CANCELLED
QUERY_WAS_CANCELLED

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "create table null_t (number UInt64) engine = Null;"
${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time 0.3 --timeout_overflow_mode break --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED"