dbms: quotas: development [#CONV-8459].

This commit is contained in:
Alexey Milovidov 2013-08-12 00:36:18 +00:00
parent be67c3e3d6
commit 4001d37bb7
11 changed files with 471 additions and 10 deletions

View File

@ -204,6 +204,9 @@ namespace ErrorCodes
UNKNOWN_ADDRESS_PATTERN_TYPE,
SERVER_REVISION_IS_TOO_OLD,
DNS_ERROR,
UNKNOWN_QUOTA,
QUOTA_DOESNT_ALLOW_KEYS,
QUOTA_EXPIRED,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -17,6 +17,7 @@
#include <DB/Storages/StorageFactory.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Dictionaries.h>
@ -50,6 +51,7 @@ struct ContextShared
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
Logger * log; /// Логгер.
mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов.
@ -71,6 +73,7 @@ private:
Shared shared;
String user; /// Текущий пользователь.
QuotaForIntervals * quota; /// Текущая квота.
String current_database; /// Текущая БД.
NamesAndTypesList columns; /// Столбцы текущей обрабатываемой таблицы.
Settings settings; /// Настройки выполнения запроса.
@ -83,13 +86,17 @@ private:
Context * global_context; /// Глобальный контекст или NULL, если его нет. (Возможно, равен this.)
public:
Context() : shared(new ContextShared), session_context(NULL), global_context(NULL) {}
Context() : shared(new ContextShared), quota(NULL), session_context(NULL), global_context(NULL) {}
String getPath() const;
void setPath(const String & path);
void initUsersFromConfig();
void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address);
void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key);
void initQuotasFromConfig();
void setQuota(const String & name, const String & quota_key, const Poco::Net::IPAddress & address);
QuotaForIntervals & getQuota();
/// Проверка существования таблицы/БД. database может быть пустой - в этом случае используется текущая БД.
bool isTableExist(const String & database_name, const String & table_name) const;

View File

@ -0,0 +1,151 @@
#pragma once
#include <string.h>
#include <tr1/unordered_map>
#include <Poco/Timespan.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/IPAddress.h>
#include <Yandex/Common.h>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/SipHash.h>
namespace DB
{
/** Квота на потребление ресурсов за заданный интервал - часть настроек.
* Используются, чтобы ограничить потребление ресурсов пользователем.
* Квота действует "мягко" - может быть немного превышена, так как проверяется, как правило, на каждый блок.
* Квота не сохраняется при перезапуске сервера.
* Квота распространяется только на один сервер.
*/
/// Используется как для максимальных значений, так и в качестве счётчика накопившихся значений.
struct QuotaValues
{
/// Нули в качестве ограничений означают "не ограничено".
size_t queries; /// Количество запросов.
size_t errors; /// Количество запросов с эксепшенами.
size_t result_rows; /// Количество строк, отданных в качестве результата.
size_t read_rows; /// Количество строк, прочитанных из таблиц.
Poco::Timespan execution_time; /// Суммарное время выполнения запросов.
QuotaValues()
{
clear();
}
void clear()
{
memset(this, 0, sizeof(*this));
}
void initFromConfig(const String & config_elem);
};
/// Время, округлённое до границы интервала, квота за интервал и накопившиеся за этот интервал значения.
struct QuotaForInterval
{
time_t rounded_time;
size_t duration;
QuotaValues max;
QuotaValues used;
QuotaForInterval() : rounded_time() {}
QuotaForInterval(time_t duration_) : duration(duration_) {}
void initFromConfig(const String & config_elem, time_t duration_);
/// Увеличить соответствующее значение.
void addQuery(time_t current_time, const String & quota_name);
void addError(time_t current_time, const String & quota_name);
/// Проверить, не превышена ли квота уже. Если превышена - кидает исключение.
void checkExceeded(time_t current_time, const String & quota_name);
/// Проверить соответствующее значение. Если превышено - кинуть исключение. Иначе - увеличить его.
void checkAndAddResultRows(time_t current_time, const String & quota_name, size_t ammount);
void checkAndAddReadRows(time_t current_time, const String & quota_name, size_t ammount);
void checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan ammount);
private:
/// Сбросить счётчик использованных ресурсов, если соответствующий интервал, за который считается квота, прошёл.
void updateTime(time_t current_time);
void check(size_t max_ammount, size_t used_ammount, time_t current_time, const String & quota_name, const char * resource_name);
};
struct Quota;
/// Длина интервала -> квота и накопившиеся за текущий интервал такой длины значения (например, 3600 -> значения за текущий час).
class QuotaForIntervals
{
private:
/// При проверке, будем обходить интервалы в порядке, обратном величине - от самого большого до самого маленького.
typedef std::map<size_t, QuotaForInterval> Container;
Container cont;
Quota * parent;
public:
QuotaForIntervals(Quota * parent_) : parent(parent_) {}
void initFromConfig(const String & config_elem);
void addQuery(time_t current_time);
void addError(time_t current_time);
void checkExceeded(time_t current_time);
void checkAndAddResultRows(time_t current_time, size_t ammount);
void checkAndAddReadRows(time_t current_time, size_t ammount);
void checkAndAddExecutionTime(time_t current_time, Poco::Timespan ammount);
};
/// Ключ квоты -> квоты за интервалы. Если квота не допускает ключей, то накопленные значения хранятся по ключу 0.
struct Quota
{
typedef std::tr1::unordered_map<UInt64, QuotaForIntervals> Container;
String name;
/// Максимальные значения из конфига.
QuotaForIntervals max;
/// Максимальные и накопленные значения для разных ключей.
Container quota_for_keys;
Poco::FastMutex mutex;
bool is_keyed;
bool keyed_by_ip;
Quota() : max(this), is_keyed(false), keyed_by_ip(false) {}
void initFromConfig(const String & config_elem, const String & name_);
QuotaForIntervals & get(const String & quota_key, const Poco::Net::IPAddress & ip);
};
class Quotas
{
private:
/// Имя квоты -> квоты.
typedef std::tr1::unordered_map<String, SharedPtr<Quota> > Container;
Container cont;
public:
void initFromConfig();
QuotaForIntervals & get(const String & name, const String & quota_key, const Poco::Net::IPAddress & ip);
};
}

View File

@ -242,6 +242,7 @@ struct User
String password;
String profile;
String quota;
AddressPatterns addresses;
@ -252,6 +253,7 @@ struct User
password = config.getString(config_elem + ".password");
profile = config.getString(config_elem + ".profile");
quota = config.getString(config_elem + ".quota");
addresses.addFromConfig(config_elem + ".networks");
}

View File

@ -33,17 +33,39 @@ void Context::initUsersFromConfig()
}
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address)
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
const User & user_props = shared->users.get(name, password, address);
setSetting("profile", user_props.profile);
setQuota(name, quota_key, address);
user = name;
}
void Context::initQuotasFromConfig()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->quotas.initFromConfig();
}
void Context::setQuota(const String & name, const String & quota_key, const Poco::Net::IPAddress & address)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
quota = &shared->quotas.get(name, quota_key, address);
}
QuotaForIntervals & Context::getQuota()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return *quota;
}
bool Context::isTableExist(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);

View File

@ -0,0 +1,237 @@
#include <DB/Interpreters/Quota.h>
namespace DB
{
void QuotaValues::initFromConfig(const String & config_elem)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
queries = config.getInt(config_elem + ".queries", 0);
errors = config.getInt(config_elem + ".errors", 0);
result_rows = config.getInt(config_elem + ".result_rows", 0);
read_rows = config.getInt(config_elem + ".read_rows", 0);
execution_time = Poco::Timespan(config.getInt(config_elem + ".execution_time", 0), 0);
}
void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_)
{
rounded_time = 0;
duration = duration_;
max.initFromConfig(config_elem);
}
void QuotaForInterval::checkExceeded(time_t current_time, const String & quota_name)
{
updateTime(current_time);
check(max.queries, used.queries, current_time, quota_name, "Queries");
check(max.errors, used.errors, current_time, quota_name, "Errors");
check(max.result_rows, used.result_rows, current_time, quota_name, "Total result rows");
check(max.read_rows, used.read_rows, current_time, quota_name, "Total rows read");
check(max.execution_time.totalSeconds(), used.execution_time.totalSeconds(), current_time, quota_name, "Total execution time");
std::cerr << "Current values for interval " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << ":\n"
<< "queries: " << used.queries << "\n"
<< "errors: " << used.errors << "\n";
}
void QuotaForInterval::addQuery(time_t current_time, const String & quota_name)
{
++used.queries;
}
void QuotaForInterval::addError(time_t current_time, const String & quota_name)
{
++used.errors;
}
void QuotaForInterval::checkAndAddResultRows(time_t current_time, const String & quota_name, size_t ammount)
{
checkExceeded(current_time, quota_name);
used.result_rows += ammount;
}
void QuotaForInterval::checkAndAddReadRows(time_t current_time, const String & quota_name, size_t ammount)
{
checkExceeded(current_time, quota_name);
used.read_rows += ammount;
}
void QuotaForInterval::checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan ammount)
{
checkExceeded(current_time, quota_name);
used.execution_time += ammount;
}
void QuotaForInterval::updateTime(time_t current_time)
{
if (current_time >= rounded_time + static_cast<int>(duration))
{
rounded_time = current_time / duration * duration;
used.clear();
}
}
void QuotaForInterval::check(size_t max_ammount, size_t used_ammount, time_t current_time, const String & quota_name, const char * resource_name)
{
if (max_ammount && used_ammount >= max_ammount)
{
std::stringstream message;
message << "Quota '" << quota_name << "' for ";
if (duration == 3600)
message << "1 hour";
else if (duration == 60)
message << "1 minute";
else if (duration % 3600 == 0)
message << (duration / 3600) << " hours";
else if (duration % 60 == 0)
message << (duration / 60) << " minutes";
else
message << duration << " seconds";
message << " has been expired. "
<< resource_name << ": " << used_ammount << ", max: " << max_ammount << ". "
<< "Interval will end at " << mysqlxx::DateTime(rounded_time + duration) << ".";
throw Exception(message.str(), ErrorCodes::QUOTA_EXPIRED);
}
}
void QuotaForIntervals::initFromConfig(const String & config_elem)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
{
if (0 != it->compare(0, strlen("interval"), "interval"))
continue;
String interval_config_elem = config_elem + "." + *it;
time_t duration = config.getInt(interval_config_elem + ".duration");
cont[duration].initFromConfig(interval_config_elem, duration);
}
}
void QuotaForIntervals::checkExceeded(time_t current_time)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkExceeded(current_time, parent->name);
}
void QuotaForIntervals::addQuery(time_t current_time)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.addQuery(current_time, parent->name);
}
void QuotaForIntervals::addError(time_t current_time)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.addError(current_time, parent->name);
}
void QuotaForIntervals::checkAndAddResultRows(time_t current_time, size_t ammount)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddResultRows(current_time, parent->name, ammount);
}
void QuotaForIntervals::checkAndAddReadRows(time_t current_time, size_t ammount)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddReadRows(current_time, parent->name, ammount);
}
void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Timespan ammount)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent->mutex);
for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it)
it->second.checkAndAddExecutionTime(current_time, parent->name, ammount);
}
void Quota::initFromConfig(const String & config_elem, const String & name_)
{
name = name_;
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
keyed_by_ip = config.has(config_elem + ".keyed_by_ip");
is_keyed = keyed_by_ip || config.has(config_elem + ".keyed");
max.initFromConfig(config_elem);
}
QuotaForIntervals & Quota::get(const String & quota_key, const Poco::Net::IPAddress & ip)
{
if (!quota_key.empty() && (!is_keyed || keyed_by_ip))
throw Exception("Quota " + name + " doesn't allow client supplied keys.", ErrorCodes::QUOTA_DOESNT_ALLOW_KEYS);
String quota_key_or_ip = keyed_by_ip ? ip.toString() : quota_key;
UInt64 quota_key_hashed = 0;
if (!quota_key_or_ip.empty())
{
union
{
char bytes[16];
UInt64 u64[2];
};
SipHash hash;
hash.update(quota_key_or_ip.data(), quota_key_or_ip.size());
hash.final(bytes);
quota_key_hashed = u64[0] ^ u64[1];
}
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
Container::iterator it = quota_for_keys.find(quota_key_hashed);
if (quota_for_keys.end() == it)
{
it = quota_for_keys.insert(std::make_pair(quota_key_hashed, QuotaForIntervals(this))).first;
it->second = max;
}
return it->second;
}
void Quotas::initFromConfig()
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys("quotas", config_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
{
cont[*it] = new Quota();
cont[*it]->initFromConfig("quotas." + *it, *it);
}
}
QuotaForIntervals & Quotas::get(const String & name, const String & quota_key, const Poco::Net::IPAddress & ip)
{
Container::iterator it = cont.find(name);
if (cont.end() == it)
throw Exception("Unknown quota " + name, ErrorCodes::UNKNOWN_QUOTA);
return it->second->get(quota_key, ip);
}
}

View File

@ -71,8 +71,23 @@ void executeQuery(
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
InterpreterQuery interpreter(ast, context, stage);
interpreter.execute(ostr, &istr, query_plan);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
try
{
InterpreterQuery interpreter(ast, context, stage);
interpreter.execute(ostr, &istr, query_plan);
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
}
@ -105,8 +120,26 @@ BlockIO executeQuery(
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
InterpreterQuery interpreter(ast, context, stage);
return interpreter.execute();
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
BlockIO res;
try
{
InterpreterQuery interpreter(ast, context, stage);
res = interpreter.execute();
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
return res;
}

View File

@ -77,11 +77,13 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
user = credentials.getUsername();
password = credentials.getPassword();
}
std::string quota_key = params.get("quota_key", "");
Context context = server.global_context;
context.setGlobalContext(server.global_context);
context.setUser(user, password, request.clientAddress().host());
context.setUser(user, password, request.clientAddress().host(), quota_key);
/// Настройки могут быть переопределены в запросе.
for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it)

View File

@ -36,6 +36,7 @@ namespace DB
/// Имя пользователя и пароль могут быть заданы как в параметрах URL, так и с помощью HTTP Basic authentification (и то, и другое не секъюрно).
std::string user = params.get("user", "default");
std::string password = params.get("password", "");
std::string quota_key = params.get("quota_key", "");
if (request.hasCredentials())
{
@ -48,7 +49,7 @@ namespace DB
Context context = server.global_context;
context.setGlobalContext(server.global_context);
context.setUser(user, password, request.clientAddress().host());
context.setUser(user, password, request.clientAddress().host(), quota_key);
OLAP::QueryParseResult olap_query = server.olap_parser->parse(request_istream);

View File

@ -98,6 +98,9 @@ int Server::main(const std::vector<std::string> & args)
/// Загружаем пользователей.
global_context.initUsersFromConfig();
/// Загружаем квоты.
global_context.initQuotasFromConfig();
/// Загружаем настройки.
Settings & settings = global_context.getSettingsRef();
settings.setProfile(config.getString("default_profile", "default"));

View File

@ -321,7 +321,7 @@ void TCPHandler::receiveHello()
<< (!user.empty() ? ", user: " + user : "")
<< ".");
connection_context.setUser(user, password, socket().peerAddress().host());
connection_context.setUser(user, password, socket().peerAddress().host(), "");
}