Merge pull request #4430 from zhang2014/feature/limit_execution

Add max_execution_speed & max_execution_bytes_speed & min_execution_bytes_speed settings
This commit is contained in:
alexey-milovidov 2019-03-04 17:09:12 +03:00 committed by GitHub
commit b172ad4274
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 3 deletions

View File

@ -247,6 +247,26 @@ void IBlockInputStream::checkQuota(Block & block)
}
}
static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
{
/// How much time to wait for the average speed to become `max_speed_in_seconds`.
UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds;
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_microseconds / 1000000;
sleep_ts.tv_nsec = sleep_microseconds % 1000000 * 1000;
/// NOTE: Returns early in case of a signal. This is considered normal.
/// NOTE: It's worth noting that this behavior affects kill of queries.
::nanosleep(&sleep_ts, nullptr);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
}
}
void IBlockInputStream::progressImpl(const Progress & value)
{
@ -313,8 +333,9 @@ void IBlockInputStream::progressImpl(const Progress & value)
last_profile_events_update_time = total_elapsed_microseconds;
}
if ((limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > limits.timeout_before_checking_execution_speed.totalMicroseconds()))
if ((limits.min_execution_speed || limits.max_execution_speed || limits.min_execution_speed_bytes ||
limits.max_execution_speed_bytes || (total_rows && limits.timeout_before_checking_execution_speed != 0)) &&
(static_cast<Int64>(total_elapsed_microseconds) > limits.timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
@ -328,6 +349,11 @@ void IBlockInputStream::progressImpl(const Progress & value)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
if (limits.min_execution_speed_bytes && progress.bytes / elapsed_seconds < limits.min_execution_speed_bytes)
throw Exception("Query is executing too slow: " + toString(progress.bytes / elapsed_seconds)
+ " bytes/sec., minimum: " + toString(limits.min_execution_speed_bytes),
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
@ -339,6 +365,12 @@ void IBlockInputStream::progressImpl(const Progress & value)
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
if (limits.max_execution_speed && progress.rows / elapsed_seconds >= limits.max_execution_speed)
limitProgressingSpeed(progress.rows, limits.max_execution_speed, total_elapsed_microseconds);
if (limits.max_execution_speed_bytes && progress.bytes / elapsed_seconds >= limits.max_execution_speed_bytes)
limitProgressingSpeed(progress.bytes, limits.max_execution_speed_bytes, total_elapsed_microseconds);
}
}

View File

@ -212,6 +212,9 @@ public:
/// in rows per second
size_t min_execution_speed = 0;
size_t max_execution_speed = 0;
size_t min_execution_speed_bytes = 0;
size_t max_execution_speed_bytes = 0;
/// Verify that the speed is not too low after the specified time has elapsed.
Poco::Timespan timeout_before_checking_execution_speed = 0;
};

View File

@ -1039,6 +1039,9 @@ void InterpreterSelectQuery::executeFetchColumns(
if (to_stage == QueryProcessingStage::Complete)
{
limits.min_execution_speed = settings.min_execution_speed;
limits.max_execution_speed = settings.max_execution_speed;
limits.min_execution_speed_bytes = settings.min_execution_speed_bytes;
limits.max_execution_speed_bytes = settings.max_execution_speed_bytes;
limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
}

View File

@ -233,7 +233,10 @@ struct Settings
M(SettingSeconds, max_execution_time, 0, "") \
M(SettingOverflowMode<false>, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
\
M(SettingUInt64, min_execution_speed, 0, "In rows per second.") \
M(SettingUInt64, min_execution_speed, 0, "Minimum number of execution rows per second.") \
M(SettingUInt64, max_execution_speed, 0, "Maximum number of execution rows per second.") \
M(SettingUInt64, min_execution_speed_bytes, 0, "Minimum number of execution bytes per second.") \
M(SettingUInt64, max_execution_speed_bytes, 0, "Maximum number of execution bytes per second.") \
M(SettingSeconds, timeout_before_checking_execution_speed, 0, "Check that the speed is not too low after the specified time has elapsed.") \
\
M(SettingUInt64, max_columns_to_read, 0, "") \

View File

@ -111,6 +111,18 @@ What to do if the query is run longer than 'max_execution_time': 'throw' or 'bre
Minimal execution speed in rows per second. Checked on every data block when 'timeout_before_checking_execution_speed' expires. If the execution speed is lower, an exception is thrown.
## min_execution_speed_bytes
Minimum number of execution bytes per second. Checked on every data block when 'timeout_before_checking_execution_speed' expires. If the execution speed is lower, an exception is thrown.
## max_execution_speed
Maximum number of execution rows per second. Checked on every data block when 'timeout_before_checking_execution_speed' expires. If the execution speed is high, the execution speed will be reduced.
## max_execution_speed_bytes
Maximum number of execution bytes per second. Checked on every data block when 'timeout_before_checking_execution_speed' expires. If the execution speed is high, the execution speed will be reduced.
## timeout_before_checking_execution_speed
Checks that execution speed is not too slow (no less than 'min_execution_speed'), after the specified time in seconds has expired.

View File

@ -112,6 +112,18 @@
Минимальная скорость выполнения запроса в строчках в секунду. Проверяется на каждый блок данных по истечении timeout_before_checking_execution_speed. Если скорость выполнения запроса оказывается меньше, то кидается исключение.
## min_execution_speed_bytes
Минимальная скорость выполнения запроса в строках на байт. Он проверяется для каждого блока данных после timeout_before_checking_execution_speed. Если скорость выполнения запроса меньше, исключение.
## max_execution_speed
Максимальная скорость выполнения запроса в строках в секунду. Он проверяется для каждого блока данных после timeout_before_checking_execution_speed. Если скорость выполнения запроса выше, скорость будет снижена.
## max_execution_speed_bytes
Максимальная скорость выполнения запроса в байтах в секунду. Он проверяется для каждого блока данных после timeout_before_checking_execution_speed. Если скорость выполнения запроса выше, скорость будет снижена.
## timeout_before_checking_execution_speed
Проверять, что скорость выполнения запроса не слишком низкая (не меньше min_execution_speed), после прошествия указанного времени в секундах.