mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Avoid extra current limits checks in non-root streams. [#CLICKHOUSE-2745]
This commit is contained in:
parent
6500a84dd0
commit
1b7b2b8373
@ -112,12 +112,12 @@ public:
|
|||||||
return is_cancelled.load(std::memory_order_seq_cst);
|
return is_cancelled.load(std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Какие ограничения (и квоты) проверяются.
|
/** What limitations and quotas should be checked.
|
||||||
* Если LIMITS_CURRENT - ограничения проверяются на количество данных, прочитанных только в этом stream-е.
|
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
|
||||||
* - используется для реализации ограничений на объём результата выполнения запроса.
|
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
|
||||||
* Если LIMITS_TOTAL, то ещё дополнительно делается проверка в колбэке прогресса,
|
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
|
||||||
* по суммарным данным по всем листовым stream-ам, в том числе, с удалённых серверов.
|
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
|
||||||
* - используется для реализации ограничений на общий объём прочитанных (исходных) данных.
|
* Currently this check is performed only in leaf streams.
|
||||||
*/
|
*/
|
||||||
enum LimitsMode
|
enum LimitsMode
|
||||||
{
|
{
|
||||||
@ -125,11 +125,12 @@ public:
|
|||||||
LIMITS_TOTAL,
|
LIMITS_TOTAL,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Используется подмножество ограничений из Limits.
|
/// It is a subset of limitations from Limits.
|
||||||
struct LocalLimits
|
struct LocalLimits
|
||||||
{
|
{
|
||||||
LimitsMode mode = LIMITS_CURRENT;
|
LimitsMode mode = LIMITS_CURRENT;
|
||||||
|
|
||||||
|
/// If it is zero, corresponding limit check isn't performed.
|
||||||
size_t max_rows_to_read = 0;
|
size_t max_rows_to_read = 0;
|
||||||
size_t max_bytes_to_read = 0;
|
size_t max_bytes_to_read = 0;
|
||||||
OverflowMode read_overflow_mode = OverflowMode::THROW;
|
OverflowMode read_overflow_mode = OverflowMode::THROW;
|
||||||
@ -137,13 +138,13 @@ public:
|
|||||||
Poco::Timespan max_execution_time = 0;
|
Poco::Timespan max_execution_time = 0;
|
||||||
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
|
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
|
||||||
|
|
||||||
/// В строчках в секунду.
|
/// in rows per second
|
||||||
size_t min_execution_speed = 0;
|
size_t min_execution_speed = 0;
|
||||||
/// Проверять, что скорость не слишком низкая, после прошествия указанного времени.
|
/// Проверять, что скорость не слишком низкая, после прошествия указанного времени.
|
||||||
Poco::Timespan timeout_before_checking_execution_speed = 0;
|
Poco::Timespan timeout_before_checking_execution_speed = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Установить ограничения для проверки на каждый блок. */
|
/** Set limitations that checked on each block. */
|
||||||
void setLimits(const LocalLimits & limits_)
|
void setLimits(const LocalLimits & limits_)
|
||||||
{
|
{
|
||||||
limits = limits_;
|
limits = limits_;
|
||||||
|
@ -145,12 +145,14 @@ void IProfilingBlockInputStream::updateExtremes(Block & block)
|
|||||||
|
|
||||||
bool IProfilingBlockInputStream::checkLimits()
|
bool IProfilingBlockInputStream::checkLimits()
|
||||||
{
|
{
|
||||||
/// Проверка ограничений.
|
if (limits.mode == LIMITS_CURRENT)
|
||||||
|
{
|
||||||
|
/// Check current stream limitations (i.e. max_result_{rows,bytes})
|
||||||
|
|
||||||
if (limits.max_rows_to_read && info.rows > limits.max_rows_to_read)
|
if (limits.max_rows_to_read && info.rows > limits.max_rows_to_read)
|
||||||
{
|
{
|
||||||
if (limits.read_overflow_mode == OverflowMode::THROW)
|
if (limits.read_overflow_mode == OverflowMode::THROW)
|
||||||
throw Exception(std::string("Limit for ")
|
throw Exception(std::string("Limit for result rows ")
|
||||||
+ (limits.mode == LIMITS_CURRENT ? "result rows" : "rows to read")
|
|
||||||
+ " exceeded: read " + toString(info.rows)
|
+ " exceeded: read " + toString(info.rows)
|
||||||
+ " rows, maximum: " + toString(limits.max_rows_to_read),
|
+ " rows, maximum: " + toString(limits.max_rows_to_read),
|
||||||
ErrorCodes::TOO_MUCH_ROWS);
|
ErrorCodes::TOO_MUCH_ROWS);
|
||||||
@ -164,8 +166,7 @@ bool IProfilingBlockInputStream::checkLimits()
|
|||||||
if (limits.max_bytes_to_read && info.bytes > limits.max_bytes_to_read)
|
if (limits.max_bytes_to_read && info.bytes > limits.max_bytes_to_read)
|
||||||
{
|
{
|
||||||
if (limits.read_overflow_mode == OverflowMode::THROW)
|
if (limits.read_overflow_mode == OverflowMode::THROW)
|
||||||
throw Exception(std::string("Limit for ")
|
throw Exception(std::string("Limit for result bytes (uncompressed)")
|
||||||
+ (limits.mode == LIMITS_CURRENT ? "result bytes (uncompressed)" : "(uncompressed) bytes to read")
|
|
||||||
+ " exceeded: read " + toString(info.bytes)
|
+ " exceeded: read " + toString(info.bytes)
|
||||||
+ " bytes, maximum: " + toString(limits.max_bytes_to_read),
|
+ " bytes, maximum: " + toString(limits.max_bytes_to_read),
|
||||||
ErrorCodes::TOO_MUCH_BYTES);
|
ErrorCodes::TOO_MUCH_BYTES);
|
||||||
@ -175,6 +176,7 @@ bool IProfilingBlockInputStream::checkLimits()
|
|||||||
|
|
||||||
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (limits.max_execution_time != 0
|
if (limits.max_execution_time != 0
|
||||||
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
|
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
|
||||||
|
Loading…
Reference in New Issue
Block a user