mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 10:40:49 +00:00
Use softer checks
This commit is contained in:
parent
a7ae715950
commit
5662d0aa59
@ -338,6 +338,14 @@ bool QueryStatus::checkTimeLimit()
|
||||
return limits.checkTimeLimit(watch, overflow_mode);
|
||||
}
|
||||
|
||||
bool QueryStatus::checkTimeLimitSoft()
|
||||
{
|
||||
if (is_killed.load())
|
||||
return false;
|
||||
|
||||
return limits.checkTimeLimit(watch, OverflowMode::BREAK);
|
||||
}
|
||||
|
||||
|
||||
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
|
||||
{
|
||||
|
@ -178,6 +178,8 @@ public:
|
||||
|
||||
/// Checks the query time limits (cancelled or timeout)
|
||||
bool checkTimeLimit();
|
||||
/// Same as checkTimeLimit but it never throws
|
||||
bool checkTimeLimitSoft();
|
||||
};
|
||||
|
||||
|
||||
|
@ -104,7 +104,7 @@ void PipelineExecutor::execute(size_t num_threads)
|
||||
|
||||
bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
|
||||
{
|
||||
checkTimeLimit();
|
||||
checkTimeLimitSoft();
|
||||
if (!is_execution_initialized)
|
||||
{
|
||||
initializeExecution(1);
|
||||
@ -128,11 +128,11 @@ bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PipelineExecutor::checkTimeLimit()
|
||||
bool PipelineExecutor::checkTimeLimitSoft()
|
||||
{
|
||||
if (process_list_element)
|
||||
{
|
||||
bool continuing = process_list_element->checkTimeLimit();
|
||||
bool continuing = process_list_element->checkTimeLimitSoft();
|
||||
// We call cancel here so that all processors are notified and tasks waken up
|
||||
// so that the "break" is faster and doesn't wait for long events
|
||||
if (!continuing)
|
||||
@ -143,6 +143,15 @@ bool PipelineExecutor::checkTimeLimit()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PipelineExecutor::checkTimeLimit()
|
||||
{
|
||||
bool continuing = checkTimeLimitSoft();
|
||||
if (!continuing)
|
||||
process_list_element->checkTimeLimit(); // Will throw if needed
|
||||
|
||||
return continuing;
|
||||
}
|
||||
|
||||
void PipelineExecutor::finalizeExecution()
|
||||
{
|
||||
checkTimeLimit();
|
||||
@ -208,7 +217,8 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
if (tasks.isFinished())
|
||||
break;
|
||||
|
||||
checkTimeLimit();
|
||||
if (!checkTimeLimitSoft())
|
||||
break;
|
||||
|
||||
#ifndef NDEBUG
|
||||
Stopwatch processing_time_watch;
|
||||
|
@ -43,8 +43,10 @@ public:
|
||||
/// Cancel execution. May be called from another thread.
|
||||
void cancel();
|
||||
|
||||
/// Checks the query time limits (cancelled or timeout)
|
||||
/// Checks the query time limits (cancelled or timeout). Throws on cancellation or when time limit is reached and the query uses "break"
|
||||
bool checkTimeLimit();
|
||||
/// Same as checkTimeLimit but it never throws. It returns false on cancellation or time limit reached
|
||||
bool checkTimeLimitSoft();
|
||||
|
||||
private:
|
||||
ExecutingGraphPtr graph;
|
||||
|
@ -117,7 +117,8 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
|
||||
|
||||
data->rethrowExceptionIfHas();
|
||||
|
||||
bool is_execution_finished = !data->executor->checkTimeLimit() || lazy_format ? lazy_format->isFinished() : data->is_finished.load();
|
||||
bool is_execution_finished
|
||||
= !data->executor->checkTimeLimitSoft() || lazy_format ? lazy_format->isFinished() : data->is_finished.load();
|
||||
|
||||
if (is_execution_finished)
|
||||
{
|
||||
|
@ -44,7 +44,7 @@ bool PullingPipelineExecutor::pull(Chunk & chunk)
|
||||
if (!executor)
|
||||
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
||||
|
||||
if (!executor->checkTimeLimit())
|
||||
if (!executor->checkTimeLimitSoft())
|
||||
return false;
|
||||
|
||||
if (!executor->executeStep(&has_data_flag))
|
||||
|
Loading…
Reference in New Issue
Block a user