Small refactorings.

This commit is contained in:
Yarik Briukhovetskyi 2024-10-31 12:27:10 +01:00 committed by GitHub
parent 093ec02ba1
commit 6c7305e447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 37 additions and 30 deletions

View File

@ -1,5 +1,6 @@
#include <Common/logger_useful.h>
#include "Interpreters/ProcessList.h"
#include "QueryPipeline/SizeLimits.h"
#include <Interpreters/CancellationChecker.h>
#include <__chrono/duration.h>

View File

@ -2675,7 +2675,7 @@ bool Context::isBackgroundOperationContext() const
void Context::killCurrentQuery() const
{
if (auto elem = getProcessListElement())
elem->cancelQuery(CancelReason::MANUAL_CANCEL);
elem->cancelQuery(CancelReason::CANCELLED_BY_USER);
}
bool Context::isCurrentQueryKilled() const

View File

@ -490,17 +490,26 @@ CancellationCode QueryStatus::cancelQuery(CancelReason reason)
return CancellationCode::CancelSent;
}
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
void QueryStatus::throwProperExceptionIfNeeded(const UInt64 & max_execution_time, const UInt64 & elapsed_ns)
{
if (is_killed.load())
{
String additional_error_part;
if (!elapsed_ns)
additional_error_part = fmt::format("elapsed {} ms,", static_cast<double>(elapsed_ns) / 1000000000ULL);
if (cancel_reason == CancelReason::TIMEOUT)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded: {} maximum: {} ms", additional_error_part, max_execution_time / 1000.0);
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
}
void QueryStatus::addPipelineExecutor(PipelineExecutor * e, UInt64 max_exec_time)
{
/// In case of asynchronous distributed queries it is possible to call
/// addPipelineExecutor() from the cancelQuery() context, and this will
/// lead to deadlock.
if (is_killed.load())
{
if (cancel_reason == CancelReason::TIMEOUT)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Query was timed out");
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
throwProperExceptionIfNeeded(max_exec_time);
std::lock_guard lock(executors_mutex);
assert(!executors.contains(e));
@ -525,14 +534,10 @@ void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
bool QueryStatus::checkTimeLimit()
{
if (is_killed.load())
{
if (cancel_reason == CancelReason::TIMEOUT)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Query was timed out");
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
auto elapsed_ns = watch.elapsed();
throwProperExceptionIfNeeded(limits.max_execution_time.totalMilliseconds(), elapsed_ns);
return limits.checkTimeLimit(watch, overflow_mode);
return limits.checkTimeLimit(elapsed_ns, overflow_mode);
}
bool QueryStatus::checkTimeLimitSoft()
@ -540,10 +545,9 @@ bool QueryStatus::checkTimeLimitSoft()
if (is_killed.load())
return false;
return limits.checkTimeLimit(watch, OverflowMode::BREAK);
return limits.checkTimeLimit(watch.elapsedNanoseconds(), OverflowMode::BREAK);
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{
user_process_list = user_process_list_;
@ -622,7 +626,7 @@ CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id,
cancelled_cv.notify_all();
});
return elem->cancelQuery(CancelReason::MANUAL_CANCEL);
return elem->cancelQuery(CancelReason::CANCELLED_BY_USER);
}
@ -644,7 +648,7 @@ CancellationCode ProcessList::sendCancelToQuery(QueryStatusPtr elem)
cancelled_cv.notify_all();
});
return elem->cancelQuery(CancelReason::MANUAL_CANCEL);
return elem->cancelQuery(CancelReason::CANCELLED_BY_USER);
}
@ -670,7 +674,7 @@ void ProcessList::killAllQueries()
}
for (auto & cancelled_process : cancelled_processes)
cancelled_process->cancelQuery(CancelReason::MANUAL_CANCEL);
cancelled_process->cancelQuery(CancelReason::CANCELLED_BY_USER);
}

View File

@ -44,9 +44,9 @@ class ProcessListEntry;
enum CancelReason
{
NOT_CANCELLED,
UNDEFINED,
TIMEOUT,
MANUAL_CANCEL,
CANCELLED_BY_USER,
};
/** Information of process list element.
@ -113,7 +113,7 @@ protected:
bool is_cancelling { false };
/// KILL was send to the query
std::atomic<bool> is_killed { false };
std::atomic<CancelReason> cancel_reason { CancelReason::NOT_CANCELLED };
std::atomic<CancelReason> cancel_reason { CancelReason::UNDEFINED };
/// All data to the client already had been sent.
/// Including EndOfStream or Exception.
@ -234,6 +234,8 @@ public:
CancellationCode cancelQuery(CancelReason reason);
void throwProperExceptionIfNeeded(UInt64 max_execution_time);
bool isKilled() const { return is_killed; }
/// Returns an entry in the ProcessList associated with this QueryStatus. The function can return nullptr.

View File

@ -32,6 +32,7 @@ namespace Setting
{
extern const SettingsBool log_processors_profiles;
extern const SettingsBool opentelemetry_trace_processors;
extern const SettingsSeconds max_execution_time;
}
namespace ErrorCodes
@ -67,7 +68,8 @@ PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, Que
{
// Add the pipeline to the QueryStatus at the end to avoid issues if other things throw
// as that would leave the executor "linked"
process_list_element->addPipelineExecutor(this);
UInt64 max_exec_time = process_list_element->getContext()->getSettingsRef()[Setting::max_execution_time].totalMilliseconds();
process_list_element->addPipelineExecutor(this, max_exec_time);
}
}

View File

@ -32,7 +32,7 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
info.started = true;
}
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode))
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode))
{
stopReading();
return;

View File

@ -117,17 +117,15 @@ static bool handleOverflowMode(OverflowMode mode, int code, FormatStringHelper<A
}
}
bool ExecutionSpeedLimits::checkTimeLimit(const Stopwatch & stopwatch, OverflowMode overflow_mode) const
bool ExecutionSpeedLimits::checkTimeLimit(const UInt64 & elapsed_ns, OverflowMode overflow_mode) const
{
if (max_execution_time != 0)
{
auto elapsed_ns = stopwatch.elapsed();
if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(
overflow_mode,
ErrorCodes::TIMEOUT_EXCEEDED,
"Timeout exceeded: elapsed {} seconds, maximum: {}",
"Timeout exceeded: elapsed {} seconds, maximum: {} seconds",
static_cast<double>(elapsed_ns) / 1000000000ULL,
max_execution_time.totalMicroseconds() / 1000000.0);
}

View File

@ -47,7 +47,7 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes, c
{
for (const auto & limits : storage_limits)
{
if (!limits.local_limits.speed_limits.checkTimeLimit(total_stopwatch, limits.local_limits.timeout_overflow_mode))
if (!limits.local_limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.local_limits.timeout_overflow_mode))
return false;
}