Add checking for source processors in executing graph

This commit is contained in:
alexX512 2023-02-15 15:40:42 +00:00
parent 5aabfe8644
commit 26fd12e0c7
2 changed files with 20 additions and 3 deletions

View File

@ -16,6 +16,7 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
{
uint64_t num_processors = processors->size();
nodes.reserve(num_processors);
source_processors.reserve(num_processors);
/// Create nodes.
for (uint64_t node = 0; node < num_processors; ++node)
@ -23,6 +24,9 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
IProcessor * proc = processors->at(node).get();
processors_map[proc] = node;
nodes.emplace_back(std::make_unique<Node>(proc, node));
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
/// Create edges.
@ -117,6 +121,14 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
return false;
}
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
source_processors.reserve(source_processors.size() + new_processors.size());
for (auto& proc: new_processors)
{
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
}
uint64_t num_processors = processors->size();
@ -396,13 +408,16 @@ void ExecutingGraph::cancel(bool hard_cancel)
{
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
uint64_t num_processors = processors->size();
for (uint64_t proc = 0; proc < num_processors; ++proc)
{
try
{
bool is_source = processor->getInputs().empty();
if (hard_cancel || is_source)
if (hard_cancel || source_processors.at(proc))
{
IProcessor * processor = processors->at(proc).get();
processor->cancel();
}
}
catch (...)
{

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <queue>
#include <stack>
#include <vector>
namespace DB
@ -152,6 +153,7 @@ private:
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
std::shared_ptr<Processors> processors;
std::vector<bool> source_processors;
std::mutex processors_mutex;
UpgradableMutex nodes_mutex;