Move limits check to ProcessList

This commit is contained in:
Raúl Marín 2021-11-26 12:44:39 +01:00
parent be60759e68
commit c498b7ba59
5 changed files with 25 additions and 17 deletions

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
@ -291,6 +292,9 @@ QueryStatus::QueryStatus(
, client_info(client_info_)
, priority_handle(std::move(priority_handle_))
{
auto settings = getContext()->getSettings();
limits.max_execution_time = settings.max_execution_time;
overflow_mode = settings.timeout_overflow_mode;
}
QueryStatus::~QueryStatus()
@ -326,6 +330,14 @@ void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
}
bool QueryStatus::checkTimeLimit()
{
if (is_killed.load())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return limits.checkTimeLimit(watch, overflow_mode);
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{

View File

@ -1,11 +1,12 @@
#pragma once
#include <Core/Defines.h>
#include <QueryPipeline/BlockIO.h>
#include <IO/Progress.h>
#include <Interpreters/CancellationCode.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/QueryPriorities.h>
#include <QueryPipeline/BlockIO.h>
#include <QueryPipeline/ExecutionSpeedLimits.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/Condition.h>
#include <Common/CurrentMetrics.h>
@ -74,7 +75,6 @@ protected:
friend class ThreadStatus;
friend class CurrentThread;
friend class ProcessListEntry;
friend class PipelineExecutor;
String query;
ClientInfo client_info;
@ -89,6 +89,11 @@ protected:
/// Progress of output stream
Progress progress_out;
/// Used to externally check for the query time limits
/// They are saved in the constructor to limit the overhead of each call to checkTimeLimit()
ExecutionSpeedLimits limits;
OverflowMode overflow_mode;
QueryPriorities::Handle priority_handle;
std::atomic<bool> is_killed { false };
@ -170,6 +175,9 @@ public:
/// Removes a pipeline to the QueryStatus
void removePipelineExecutor(PipelineExecutor * e);
/// Checks the query time limits (cancelled or timeout)
bool checkTimeLimit();
};

View File

@ -21,7 +21,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
@ -45,12 +44,9 @@ PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
}
if (process_list_element)
{
auto settings = process_list_element->getContext()->getSettings();
// Add the pipeline to the QueryStatus at the end to avoid issues if other things throw
// as that would leave the executor "linked"
process_list_element->addPipelineExecutor(this);
limits.max_execution_time = settings.max_execution_time;
overflow_mode = settings.timeout_overflow_mode;
}
}
@ -136,15 +132,12 @@ bool PipelineExecutor::checkTimeLimit()
{
if (process_list_element)
{
if (process_list_element->isKilled())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
bool cont = limits.checkTimeLimit(process_list_element->watch, overflow_mode);
bool continuing = process_list_element->checkTimeLimit();
// We call cancel here so that all processors are notified and tasks waken up
// so that the "break" is faster and doesn't wait for long events
if (!cont)
if (!continuing)
cancel();
return cont;
return continuing;
}
return true;

View File

@ -2,7 +2,6 @@
#include <Processors/IProcessor.h>
#include <Processors/Executors/ExecutorTasks.h>
#include <QueryPipeline/ExecutionSpeedLimits.h>
#include <Common/EventCounter.h>
#include <base/logger_useful.h>
@ -75,9 +74,6 @@ private:
void finish();
String dumpPipeline() const;
ExecutionSpeedLimits limits;
OverflowMode overflow_mode;
};
using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;

View File

@ -7,7 +7,6 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/LimitTransform.h>
#include <Interpreters/Context.h>
namespace DB
{