diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index 97895c8a39d..eb06d45c81b 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -16,6 +16,7 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr processors_, bool pro { uint64_t num_processors = processors->size(); nodes.reserve(num_processors); + source_processors.reserve(num_processors); /// Create nodes. for (uint64_t node = 0; node < num_processors; ++node) @@ -23,6 +24,9 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr processors_, bool pro IProcessor * proc = processors->at(node).get(); processors_map[proc] = node; nodes.emplace_back(std::make_unique(proc, node)); + + bool is_source = proc->getInputs().empty(); + source_processors.emplace_back(is_source); } /// Create edges. @@ -117,6 +121,14 @@ bool ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) return false; } processors->insert(processors->end(), new_processors.begin(), new_processors.end()); + + source_processors.reserve(source_processors.size() + new_processors.size()); + + for (auto& proc: new_processors) + { + bool is_source = proc->getInputs().empty(); + source_processors.emplace_back(is_source); + } } uint64_t num_processors = processors->size(); @@ -396,13 +408,16 @@ void ExecutingGraph::cancel(bool hard_cancel) { std::lock_guard guard(processors_mutex); - for (auto & processor : *processors) + uint64_t num_processors = processors->size(); + for (uint64_t proc = 0; proc < num_processors; ++proc) { try { - bool is_source = processor->getInputs().empty(); - if (hard_cancel || is_source) + if (hard_cancel || source_processors.at(proc)) + { + IProcessor * processor = processors->at(proc).get(); processor->cancel(); + } } catch (...) { diff --git a/src/Processors/Executors/ExecutingGraph.h b/src/Processors/Executors/ExecutingGraph.h index 69e3525d5c7..971c1f0e128 100644 --- a/src/Processors/Executors/ExecutingGraph.h +++ b/src/Processors/Executors/ExecutingGraph.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -152,6 +153,7 @@ private: bool expandPipeline(std::stack & stack, uint64_t pid); std::shared_ptr processors; + std::vector source_processors; std::mutex processors_mutex; UpgradableMutex nodes_mutex;