dbms: more sane progress bar - it doesn't roll back when using ConcatBlockInputStream [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-02-15 09:08:31 +03:00
parent 886b2daf9b
commit fa84f264aa
3 changed files with 100 additions and 56 deletions

View File

@ -65,7 +65,13 @@ public:
* - проверяются ограничения и квоты, которые должны быть проверены не в рамках одного источника,
* а над общим количеством потраченных ресурсов во всех источниках сразу (информация в ProcessList-е).
*/
virtual void progress(const Progress & value) { progressImpl(value); }
virtual void progress(const Progress & value)
{
/// Данные для прогресса берутся из листовых источников.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
@ -77,6 +83,10 @@ public:
*/
void setProcessListElement(ProcessList::Element * elem);
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
*/
void setTotalRowsApprox(size_t value) { total_rows_approx = value; }
/** Попросить прервать получение данных как можно скорее.
* По-умолчанию - просто выставляет флаг is_cancelled и просит прерваться всех детей.
@ -161,6 +171,10 @@ protected:
Block totals;
/// Минимумы и максимумы. Первая строчка блока - минимумы, вторая - максимумы.
Block extremes;
/// Приблизительное общее количество строк, которых нужно прочитать. Для прогресс-бара.
size_t total_rows_approx = 0;
/// Информация о приблизительном общем количестве строк собрана в родительском источнике.
bool collected_total_rows_approx = false;
/// Ограничения и квоты.
@ -182,6 +196,14 @@ protected:
*/
bool checkLimits();
void checkQuota(Block & block);
/// Собрать информацию о приблизительном общем числе строк по всем детям.
void collectTotalRowsApprox();
/** Передать информацию о приблизительном общем числе строк в колбэк прогресса.
* Сделано так, что отправка происходит лишь в верхнем источнике.
*/
void collectAndSendTotalRowsApprox();
};
}

View File

@ -69,6 +69,7 @@ public:
}
/// Оценим общее количество строк - для прогресс-бара.
size_t total_rows = 0;
for (const auto & range : all_mark_ranges)
total_rows += range.end - range.begin;
total_rows *= storage.index_granularity;
@ -79,6 +80,8 @@ public:
? ", up to " + toString((all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity)
: "")
<< " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity);
setTotalRowsApprox(total_rows);
}
String getName() const override { return "MergeTreeBlockInputStream"; }
@ -157,10 +160,6 @@ protected:
if (!reader)
{
/// Отправим информацию о том, что собираемся читать примерно столько-то строк.
/// NOTE В конструкторе это делать не получилось бы, потому что тогда ещё не установлен progress_callback.
progressImpl(Progress(0, 0, total_rows));
injectRequiredColumns(columns);
injectRequiredColumns(pre_columns);
@ -353,7 +352,6 @@ private:
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
bool remove_prewhere_column;
size_t total_rows = 0; /// Приблизительное общее количество строк - для прогресс-бара.
Logger * log;
};

View File

@ -13,6 +13,8 @@ namespace DB
Block IProfilingBlockInputStream::read()
{
collectAndSendTotalRowsApprox();
if (!info.started)
{
info.total_stopwatch.start();
@ -211,66 +213,62 @@ void IProfilingBlockInputStream::checkQuota(Block & block)
void IProfilingBlockInputStream::progressImpl(const Progress & value)
{
/// Данные для прогресса берутся из листовых источников.
if (children.empty())
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (progress_callback)
progress_callback(value);
if (!process_list_elem->update(value))
cancel();
if (process_list_elem)
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read)))
{
if (!process_list_elem->update(value))
cancel();
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read)))
if (limits.read_overflow_mode == OverflowMode::THROW)
{
if (limits.read_overflow_mode == OverflowMode::THROW)
{
if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.max_rows_to_read),
ErrorCodes::TOO_MUCH_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed)
+ " bytes read, maximum: " + toString(limits.max_bytes_to_read),
ErrorCodes::TOO_MUCH_ROWS);
}
else if (limits.read_overflow_mode == OverflowMode::BREAK)
cancel();
if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.max_rows_to_read),
ErrorCodes::TOO_MUCH_ROWS);
else
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed)
+ " bytes read, maximum: " + toString(limits.max_bytes_to_read),
ErrorCodes::TOO_MUCH_ROWS);
}
else if (limits.read_overflow_mode == OverflowMode::BREAK)
cancel();
else
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
if (limits.min_execution_speed)
if (limits.min_execution_speed)
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& rows_processed / total_elapsed < limits.min_execution_speed)
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& rows_processed / total_elapsed < limits.min_execution_speed)
{
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
}
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes);
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes);
}
}
}
@ -343,5 +341,31 @@ const Block & IProfilingBlockInputStream::getExtremes() const
return extremes;
}
void IProfilingBlockInputStream::collectTotalRowsApprox()
{
if (collected_total_rows_approx)
return;
collected_total_rows_approx = true;
for (auto & child : children)
{
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
{
p_child->collectTotalRowsApprox();
total_rows_approx += p_child->total_rows_approx;
}
}
}
void IProfilingBlockInputStream::collectAndSendTotalRowsApprox()
{
if (collected_total_rows_approx)
return;
collectTotalRowsApprox();
progressImpl(Progress(0, 0, total_rows_approx));
}
}