ClickHouse/src/DataStreams/ExecutionSpeedLimits.cpp

120 lines
5.0 KiB
C++
Raw Normal View History

2019-10-03 18:27:11 +00:00
#include <DataStreams/ExecutionSpeedLimits.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <common/sleep.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_SLOW;
2020-01-27 16:58:25 +00:00
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
2019-10-03 18:27:11 +00:00
}
static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
{
/// How much time to wait for the average speed to become `max_speed_in_seconds`.
UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds;
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
2020-05-29 20:17:38 +00:00
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount,
/// and otherwise it's too easy to make query hang).
2019-10-03 18:27:11 +00:00
sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);
sleepForMicroseconds(sleep_microseconds);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
}
}
2019-10-21 16:26:29 +00:00
void ExecutionSpeedLimits::throttle(
size_t read_rows, size_t read_bytes,
2020-04-22 05:39:31 +00:00
size_t total_rows_to_read, UInt64 total_elapsed_microseconds) const
2019-10-03 18:27:11 +00:00
{
2019-10-21 16:26:29 +00:00
if ((min_execution_rps != 0 || max_execution_rps != 0
|| min_execution_bps != 0 || max_execution_bps != 0
2020-05-29 19:54:59 +00:00
|| (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
2019-10-03 18:27:11 +00:00
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
double elapsed_seconds = 0;
if (total_elapsed_microseconds > throttler_sleep_microseconds)
2019-10-03 18:27:11 +00:00
elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
if (elapsed_seconds > 0)
{
2019-10-21 16:26:29 +00:00
auto rows_per_second = read_rows / elapsed_seconds;
if (min_execution_rps && rows_per_second < min_execution_rps)
2019-10-03 18:27:11 +00:00
throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds)
2019-10-21 16:26:29 +00:00
+ " rows/sec., minimum: " + toString(min_execution_rps),
2019-10-03 18:27:11 +00:00
ErrorCodes::TOO_SLOW);
2019-10-21 16:26:29 +00:00
auto bytes_per_second = read_bytes / elapsed_seconds;
if (min_execution_bps && bytes_per_second < min_execution_bps)
2019-10-03 18:27:11 +00:00
throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds)
2019-10-21 16:26:29 +00:00
+ " bytes/sec., minimum: " + toString(min_execution_bps),
2019-10-03 18:27:11 +00:00
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
2019-10-21 16:26:29 +00:00
if (max_execution_time != 0 && total_rows_to_read && read_rows)
2019-10-03 18:27:11 +00:00
{
2019-10-21 16:26:29 +00:00
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows);
2019-10-03 18:27:11 +00:00
if (estimated_execution_time_seconds > max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(max_execution_time.totalSeconds())
2019-10-21 16:26:29 +00:00
+ ". Estimated rows to process: " + toString(total_rows_to_read),
2019-10-03 18:27:11 +00:00
ErrorCodes::TOO_SLOW);
}
2019-10-21 16:26:29 +00:00
if (max_execution_rps && rows_per_second >= max_execution_rps)
limitProgressingSpeed(read_rows, max_execution_rps, total_elapsed_microseconds);
2019-10-03 18:27:11 +00:00
2019-10-21 16:26:29 +00:00
if (max_execution_bps && bytes_per_second >= max_execution_bps)
limitProgressingSpeed(read_bytes, max_execution_bps, total_elapsed_microseconds);
2019-10-03 18:27:11 +00:00
}
}
}
2020-01-27 16:58:25 +00:00
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
2020-04-22 05:39:31 +00:00
bool ExecutionSpeedLimits::checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode) const
2020-01-27 16:58:25 +00:00
{
if (max_execution_time != 0
&& elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(overflow_mode,
"Timeout exceeded: elapsed " + toString(static_cast<double>(elapsed_ns) / 1000000000ULL)
+ " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
2019-10-03 18:27:11 +00:00
}