dbms: fixed limits on result data with distributed query processing [#METR-9180].

This commit is contained in:
Alexey Milovidov 2013-11-12 23:15:09 +00:00
parent e604345d25
commit 806fee097b
3 changed files with 35 additions and 23 deletions

View File

@ -80,7 +80,7 @@ class IProfilingBlockInputStream : public IBlockInputStream
public:
IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr())
: IBlockInputStream(owned_storage_), is_cancelled(false), process_list_elem(NULL),
enabled_extremes(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {}
enabled_extremes(false), quota(NULL), prev_elapsed(0) {}
Block read();
@ -143,9 +143,24 @@ public:
return is_cancelled;
}
/** Какие ограничения (и квоты) проверяются.
* Если LIMITS_CURRENT - ограничения проверяются на количество данных, прочитанных только в этом stream-е.
* - используется для реализации ограничений на объём результата выполнения запроса.
* Если LIMITS_TOTAL, то ещё дополнительно делается проверка в колбэке прогресса,
* по суммарным данным по всем листовым stream-ам, в том числе, с удалённых серверов.
* - используется для реализации ограничений на общий объём прочитанных (исходных) данных.
*/
enum LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// Используется подмножество ограничений из Limits.
struct LocalLimits
{
LimitsMode mode;
size_t max_rows_to_read;
size_t max_bytes_to_read;
Limits::OverflowMode read_overflow_mode;
@ -159,7 +174,8 @@ public:
Poco::Timespan timeout_before_checking_execution_speed;
LocalLimits()
: max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(Limits::THROW),
: mode(LIMITS_CURRENT),
max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(Limits::THROW),
max_execution_time(0), timeout_overflow_mode(Limits::THROW),
min_execution_speed(0), timeout_before_checking_execution_speed(0)
{
@ -172,19 +188,12 @@ public:
limits = limits_;
}
/// Какая квота используется - на объём исходных данных или на объём результата.
enum QuotaMode
{
QUOTA_READ,
QUOTA_RESULT,
};
/// Установить квоту.
void setQuota(QuotaForIntervals & quota_, QuotaMode quota_mode_)
/** Установить квоту. Если устанавливается квота на объём исходных данных,
* то следует ещё установить mode = LIMITS_TOTAL в LocalLimits с помощью setLimits.
*/
void setQuota(QuotaForIntervals & quota_)
{
quota = &quota_;
quota_mode = quota_mode_;
}
/// Включить рассчёт минимумов и максимумов по столбцам результата.
@ -210,7 +219,6 @@ protected:
LocalLimits limits;
QuotaForIntervals * quota; /// Если NULL - квота не используется.
QuotaMode quota_mode;
double prev_elapsed;
/// Наследники должны реализовать эту функцию.

View File

@ -326,19 +326,19 @@ void IProfilingBlockInputStream::checkQuota(Block & block)
time_t current_time = time(0);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
switch (quota_mode)
switch (limits.mode)
{
case QUOTA_READ:
case LIMITS_TOTAL:
/// Проверяется в методе progress.
break;
case QUOTA_RESULT:
case LIMITS_CURRENT:
quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
break;
default:
throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: unknown limits mode.", ErrorCodes::LOGICAL_ERROR);
}
prev_elapsed = total_elapsed;
@ -367,8 +367,9 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
*/
if ((limits.max_rows_to_read && total_rows > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && total_bytes > limits.max_bytes_to_read))
if (limits.mode == LIMITS_TOTAL
&& ((limits.max_rows_to_read && total_rows > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && total_bytes > limits.max_bytes_to_read)))
{
if (limits.read_overflow_mode == Limits::THROW)
throw Exception("Limit for rows to read exceeded: read " + toString(total_rows)
@ -389,7 +390,7 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
ErrorCodes::TOO_SLOW);
}
if (quota != NULL && quota_mode == QUOTA_READ)
if (quota != NULL && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(0), rows, bytes);
}

View File

@ -332,12 +332,13 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota(), IProfilingBlockInputStream::QUOTA_RESULT);
stream->setQuota(context.getQuota());
}
}
@ -440,6 +441,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (table && to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_read;
limits.max_bytes_to_read = settings.limits.max_bytes_to_read;
limits.read_overflow_mode = settings.limits.read_overflow_mode;
@ -455,7 +457,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
{
stream->setLimits(limits);
stream->setQuota(quota, IProfilingBlockInputStream::QUOTA_READ);
stream->setQuota(quota);
}
}
}
@ -600,6 +602,7 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
/// Ограничения на сортировку
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_sort;
limits.max_bytes_to_read = settings.limits.max_bytes_to_sort;
limits.read_overflow_mode = settings.limits.sort_overflow_mode;