Add processlist to PipelineExecutor. Throw exception if query was killed.

This commit is contained in:
Nikolai Kochetov 2020-01-28 15:59:34 +03:00
parent a19c8b3d37
commit 5a8a367228
4 changed files with 18 additions and 3 deletions

View File

@ -10,6 +10,7 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Interpreters/ProcessList.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) #if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <sched.h> #include <sched.h>
@ -33,12 +34,13 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED; && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
} }
PipelineExecutor::PipelineExecutor(Processors & processors_) PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
: processors(processors_) : processors(processors_)
, cancelled(false) , cancelled(false)
, finished(false) , finished(false)
, num_processing_executors(0) , num_processing_executors(0)
, expand_pipeline_task(nullptr) , expand_pipeline_task(nullptr)
, process_list_element(elem)
{ {
buildGraph(); buildGraph();
} }
@ -473,7 +475,12 @@ void PipelineExecutor::execute(size_t num_threads)
} }
if (cancelled) if (cancelled)
{
if (process_list_element && process_list_element->isKilled())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return; return;
}
bool all_processors_finished = true; bool all_processors_finished = true;
for (auto & node : graph) for (auto & node : graph)

View File

@ -13,6 +13,7 @@
namespace DB namespace DB
{ {
class QueryStatus;
/// Executes query pipeline. /// Executes query pipeline.
class PipelineExecutor class PipelineExecutor
@ -24,7 +25,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set. /// During pipeline execution new processors can appear. They will be added to existing set.
/// ///
/// Explicit graph representation is built in constructor. Throws if graph is not correct. /// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_); explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
/// Execute pipeline in multiple threads. Must be called once. /// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred. /// In case of exception during execution throws any occurred.
@ -242,6 +243,9 @@ private:
using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>; using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;
ProcessorsMap processors_map; ProcessorsMap processors_map;
/// Now it's used to check if query was killed.
QueryStatus * process_list_element = nullptr;
/// Graph related methods. /// Graph related methods.
bool addEdges(UInt64 node); bool addEdges(UInt64 node);
void buildGraph(); void buildGraph();

View File

@ -523,6 +523,8 @@ void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
void QueryPipeline::setProcessListElement(QueryStatus * elem) void QueryPipeline::setProcessListElement(QueryStatus * elem)
{ {
process_list_element = elem;
for (auto & processor : processors) for (auto & processor : processors)
{ {
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get())) if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
@ -630,7 +632,7 @@ PipelineExecutorPtr QueryPipeline::execute()
if (!output_format) if (!output_format)
throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(processors); return std::make_shared<PipelineExecutor>(processors, process_list_element);
} }
} }

View File

@ -120,6 +120,8 @@ private:
size_t max_threads = 0; size_t max_threads = 0;
QueryStatus * process_list_element = nullptr;
void checkInitialized(); void checkInitialized();
void checkSource(const ProcessorPtr & source, bool can_have_totals); void checkSource(const ProcessorPtr & source, bool can_have_totals);