diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index 7972c48f1cf..68e47e52822 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -80,7 +80,7 @@ class IProfilingBlockInputStream : public IBlockInputStream public: IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) : IBlockInputStream(owned_storage_), is_cancelled(false), process_list_elem(NULL), - enabled_extremes(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {} + enabled_extremes(false), quota(NULL), prev_elapsed(0) {} Block read(); @@ -143,9 +143,24 @@ public: return is_cancelled; } + /** Какие ограничения (и квоты) проверяются. + * Если LIMITS_CURRENT - ограничения проверяются на количество данных, прочитанных только в этом stream-е. + * - используется для реализации ограничений на объём результата выполнения запроса. + * Если LIMITS_TOTAL, то ещё дополнительно делается проверка в колбэке прогресса, + * по суммарным данным по всем листовым stream-ам, в том числе, с удалённых серверов. + * - используется для реализации ограничений на общий объём прочитанных (исходных) данных. + */ + enum LimitsMode + { + LIMITS_CURRENT, + LIMITS_TOTAL, + }; + /// Используется подмножество ограничений из Limits. struct LocalLimits { + LimitsMode mode; + size_t max_rows_to_read; size_t max_bytes_to_read; Limits::OverflowMode read_overflow_mode; @@ -159,7 +174,8 @@ public: Poco::Timespan timeout_before_checking_execution_speed; LocalLimits() - : max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(Limits::THROW), + : mode(LIMITS_CURRENT), + max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(Limits::THROW), max_execution_time(0), timeout_overflow_mode(Limits::THROW), min_execution_speed(0), timeout_before_checking_execution_speed(0) { @@ -172,19 +188,12 @@ public: limits = limits_; } - - /// Какая квота используется - на объём исходных данных или на объём результата. - enum QuotaMode - { - QUOTA_READ, - QUOTA_RESULT, - }; - - /// Установить квоту. - void setQuota(QuotaForIntervals & quota_, QuotaMode quota_mode_) + /** Установить квоту. Если устанавливается квота на объём исходных данных, + * то следует ещё установить mode = LIMITS_TOTAL в LocalLimits с помощью setLimits. + */ + void setQuota(QuotaForIntervals & quota_) { quota = "a_; - quota_mode = quota_mode_; } /// Включить рассчёт минимумов и максимумов по столбцам результата. @@ -210,7 +219,6 @@ protected: LocalLimits limits; QuotaForIntervals * quota; /// Если NULL - квота не используется. - QuotaMode quota_mode; double prev_elapsed; /// Наследники должны реализовать эту функцию. diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 5526a3e3fe1..cdf93b14c7a 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -326,19 +326,19 @@ void IProfilingBlockInputStream::checkQuota(Block & block) time_t current_time = time(0); double total_elapsed = info.total_stopwatch.elapsedSeconds(); - switch (quota_mode) + switch (limits.mode) { - case QUOTA_READ: + case LIMITS_TOTAL: /// Проверяется в методе progress. break; - case QUOTA_RESULT: + case LIMITS_CURRENT: quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes()); quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0)); break; default: - throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR); + throw Exception("Logical error: unknown limits mode.", ErrorCodes::LOGICAL_ERROR); } prev_elapsed = total_elapsed; @@ -367,8 +367,9 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes) * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? */ - if ((limits.max_rows_to_read && total_rows > limits.max_rows_to_read) - || (limits.max_bytes_to_read && total_bytes > limits.max_bytes_to_read)) + if (limits.mode == LIMITS_TOTAL + && ((limits.max_rows_to_read && total_rows > limits.max_rows_to_read) + || (limits.max_bytes_to_read && total_bytes > limits.max_bytes_to_read))) { if (limits.read_overflow_mode == Limits::THROW) throw Exception("Limit for rows to read exceeded: read " + toString(total_rows) @@ -389,7 +390,7 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes) ErrorCodes::TOO_SLOW); } - if (quota != NULL && quota_mode == QUOTA_READ) + if (quota != NULL && limits.mode == LIMITS_TOTAL) { quota->checkAndAddReadRowsBytes(time(0), rows, bytes); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 281692a6dad..e825ae3746e 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -332,12 +332,13 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() if (to_stage == QueryProcessingStage::Complete) { IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; limits.max_rows_to_read = settings.limits.max_result_rows; limits.max_bytes_to_read = settings.limits.max_result_bytes; limits.read_overflow_mode = settings.limits.result_overflow_mode; stream->setLimits(limits); - stream->setQuota(context.getQuota(), IProfilingBlockInputStream::QUOTA_RESULT); + stream->setQuota(context.getQuota()); } } @@ -440,6 +441,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu if (table && to_stage == QueryProcessingStage::Complete) { IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; limits.max_rows_to_read = settings.limits.max_rows_to_read; limits.max_bytes_to_read = settings.limits.max_bytes_to_read; limits.read_overflow_mode = settings.limits.read_overflow_mode; @@ -455,7 +457,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) { stream->setLimits(limits); - stream->setQuota(quota, IProfilingBlockInputStream::QUOTA_READ); + stream->setQuota(quota); } } } @@ -600,6 +602,7 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams) /// Ограничения на сортировку IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; limits.max_rows_to_read = settings.limits.max_rows_to_sort; limits.max_bytes_to_read = settings.limits.max_bytes_to_sort; limits.read_overflow_mode = settings.limits.sort_overflow_mode;