dbms: Quota: development [#CONV-8459].

This commit is contained in:
Alexey Milovidov 2013-08-28 20:47:22 +00:00
parent 181651991d
commit c10d3a4bb9
7 changed files with 102 additions and 48 deletions

View File

@ -7,6 +7,7 @@
#include <DB/Core/Names.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/Quota.h>
#include <DB/DataStreams/IBlockInputStream.h>
@ -68,7 +69,8 @@ private:
class IProfilingBlockInputStream : public IBlockInputStream
{
public:
IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) : IBlockInputStream(owned_storage_), is_cancelled(false) {}
IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr())
: IBlockInputStream(owned_storage_), is_cancelled(false), quota(NULL), quota_mode(QUOTA_READ) {}
Block read();
@ -130,6 +132,21 @@ public:
limits = limits_;
}
/// Какая квота используется - на объём исходных данных или на объём результата.
enum QuotaMode
{
QUOTA_READ,
QUOTA_RESULT,
};
/// Установить квоту.
void setQuota(QuotaForIntervals & quota_, QuotaMode quota_mode_)
{
quota = &quota_;
quota_mode = quota_mode_;
}
protected:
BlockStreamProfileInfo info;
volatile bool is_cancelled;
@ -137,6 +154,9 @@ protected:
LocalLimits limits;
QuotaForIntervals * quota; /// Если NULL - квота не используется.
QuotaMode quota_mode;
/// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0;
};

View File

@ -75,12 +75,13 @@ struct QuotaForInterval
void checkExceeded(time_t current_time, const String & quota_name);
/// Проверить соответствующее значение. Если превышено - кинуть исключение. Иначе - увеличить его.
void checkAndAddResultRows(time_t current_time, const String & quota_name, size_t amount);
void checkAndAddResultBytes(time_t current_time, const String & quota_name, size_t amount);
void checkAndAddReadRows(time_t current_time, const String & quota_name, size_t amount);
void checkAndAddReadBytes(time_t current_time, const String & quota_name, size_t amount);
void checkAndAddResultRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes);
void checkAndAddReadRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes);
void checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan amount);
/// Получить текст, описывающий, какая часть квоты израсходована.
String toString() const;
private:
/// Сбросить счётчик использованных ресурсов, если соответствующий интервал, за который считается квота, прошёл.
void updateTime(time_t current_time);
@ -102,6 +103,12 @@ private:
public:
QuotaForIntervals(Quota * parent_) : parent(parent_) {}
/// Есть ли хотя бы один интервал, за который считается квота?
bool empty() const
{
return cont.empty();
}
void initFromConfig(const String & config_elem);
@ -110,11 +117,12 @@ public:
void checkExceeded(time_t current_time);
void checkAndAddResultRows(time_t current_time, size_t amount);
void checkAndAddResultBytes(time_t current_time, size_t amount);
void checkAndAddReadRows(time_t current_time, size_t amount);
void checkAndAddReadBytes(time_t current_time, size_t amount);
void checkAndAddResultRowsBytes(time_t current_time, size_t rows, size_t bytes);
void checkAndAddReadRowsBytes(time_t current_time, size_t rows, size_t bytes);
void checkAndAddExecutionTime(time_t current_time, Poco::Timespan amount);
/// Получить текст, описывающий, какая часть квоты израсходована.
String toString() const;
};

View File

@ -302,7 +302,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
executeUnion(streams);
/// Ограничения на результат, а также колбек для прогресса.
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
IProfilingBlockInputStream::LocalLimits limits;
@ -311,6 +311,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota(), IProfilingBlockInputStream::QUOTA_RESULT);
stream->setProgressCallback(context.getProgressCallback());
}
@ -405,7 +406,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (streams.size() > settings.max_threads)
streams = narrowBlockInputStreams(streams, settings.max_threads);
/** Установка ограничений на чтение данных.
/** Установка ограничений и квоты на чтение данных.
* Они устанавливаются на самые "глубокие" чтения.
* То есть, не должны устанавливаться для чтений из удалённых серверов и подзапросов.
*/
@ -419,10 +420,17 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode;
limits.min_execution_speed = settings.limits.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
{
stream->setLimits(limits);
stream->setQuota(quota, IProfilingBlockInputStream::QUOTA_READ);
}
}
}
return from_stage;

View File

@ -1,3 +1,7 @@
#include <iomanip>
#include <Yandex/logger_useful.h>
#include <DB/Common/SipHash.h>
#include <DB/Interpreters/Quota.h>
@ -36,10 +40,23 @@ void QuotaForInterval::checkExceeded(time_t current_time, const String & quota_n
check(max.read_rows, used.read_rows, current_time, quota_name, "Total rows read");
check(max.read_bytes, used.read_bytes, current_time, quota_name, "Total bytes read");
check(max.execution_time.totalSeconds(), used.execution_time.totalSeconds(), current_time, quota_name, "Total execution time");
}
std::cerr << "Current values for interval " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << ":\n"
<< "queries: " << used.queries << "\n"
<< "errors: " << used.errors << "\n";
String QuotaForInterval::toString() const
{
std::stringstream res;
res << std::fixed << std::setprecision(3)
<< "Interval: " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << "."
<< "Queries: " << used.queries << "."
<< "Errors: " << used.errors << "."
<< "Result rows: " << used.result_rows << "."
<< "Result bytes: " << used.result_bytes << "."
<< "Read rows: " << used.read_rows << "."
<< "Read bytes: " << used.read_bytes << "."
<< "Execution time: " << used.execution_time.totalMilliseconds() / 1000.0 << " seconds.";
return res.str();
}
void QuotaForInterval::addQuery(time_t current_time, const String & quota_name)
@ -52,28 +69,18 @@ void QuotaForInterval::addError(time_t current_time, const String & quota_name)
++used.errors;
}
void QuotaForInterval::checkAndAddResultRows(time_t current_time, const String & quota_name, size_t amount)
void QuotaForInterval::checkAndAddResultRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes)
{
checkExceeded(current_time, quota_name);
used.result_rows += amount;
used.result_rows += rows;
used.result_bytes += bytes;
}
void QuotaForInterval::checkAndAddResultBytes(time_t current_time, const String & quota_name, size_t amount)
void QuotaForInterval::checkAndAddReadRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes)
{
checkExceeded(current_time, quota_name);
used.result_bytes += amount;
}
void QuotaForInterval::checkAndAddReadRows(time_t current_time, const String & quota_name, size_t amount)
{
checkExceeded(current_time, quota_name);
used.read_rows += amount;
}
void QuotaForInterval::checkAndAddReadBytes(time_t current_time, const String & quota_name, size_t amount)
{
checkExceeded(current_time, quota_name);
used.read_bytes += amount;
used.read_rows += rows;
used.read_bytes += bytes;
}
void QuotaForInterval::checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan amount)
@ -158,32 +165,18 @@ void QuotaForIntervals::addError(time_t current_time)
it->second.addError(current_time, parent->name);
}
void QuotaForIntervals::checkAndAddResultRows(time_t current_time, size_t amount)
void QuotaForIntervals::checkAndAddResultRowsBytes(time_t current_time, size_t rows, size_t bytes)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddResultRows(current_time, parent->name, amount);
it->second.checkAndAddResultRowsBytes(current_time, parent->name, rows, bytes);
}
void QuotaForIntervals::checkAndAddResultBytes(time_t current_time, size_t amount)
void QuotaForIntervals::checkAndAddReadRowsBytes(time_t current_time, size_t rows, size_t bytes)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddResultBytes(current_time, parent->name, amount);
}
void QuotaForIntervals::checkAndAddReadRows(time_t current_time, size_t amount)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddReadRows(current_time, parent->name, amount);
}
void QuotaForIntervals::checkAndAddReadBytes(time_t current_time, size_t amount)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddReadBytes(current_time, parent->name, amount);
it->second.checkAndAddReadRowsBytes(current_time, parent->name, rows, bytes);
}
void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Timespan amount)
@ -193,6 +186,19 @@ void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Time
it->second.checkAndAddExecutionTime(current_time, parent->name, amount);
}
String QuotaForIntervals::toString() const
{
std::stringstream res;
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
res << std::endl << it->second.toString() << std::endl;
}
return res.str();
}
void Quota::initFromConfig(const String & config_elem, const String & name_)
{

View File

@ -138,6 +138,10 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
}
}
QuotaForIntervals & quota = context.getQuota();
if (!quota.empty())
LOG_INFO(log, "Quota:\n" << quota.toString());
}

View File

@ -85,6 +85,10 @@ namespace DB
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
}
}
QuotaForIntervals & quota = context.getQuota();
if (!quota.empty())
LOG_INFO(log, "Quota:\n" << quota.toString());
}

View File

@ -284,6 +284,10 @@ void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in)
<< "Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
}
QuotaForIntervals & quota = query_context.getQuota();
if (!quota.empty())
LOG_INFO(log, "Quota:\n" << quota.toString());
}