From 4001d37bb76bbd201f6294aee4625e556f75300e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 12 Aug 2013 00:36:18 +0000 Subject: [PATCH] dbms: quotas: development [#CONV-8459]. --- dbms/include/DB/Core/ErrorCodes.h | 3 + dbms/include/DB/Interpreters/Context.h | 11 +- dbms/include/DB/Interpreters/Quota.h | 151 ++++++++++++++++ dbms/include/DB/Interpreters/Users.h | 2 + dbms/src/Interpreters/Context.cpp | 24 ++- dbms/src/Interpreters/Quota.cpp | 237 +++++++++++++++++++++++++ dbms/src/Interpreters/executeQuery.cpp | 41 ++++- dbms/src/Server/HTTPHandler.cpp | 4 +- dbms/src/Server/OLAPHTTPHandler.cpp | 3 +- dbms/src/Server/Server.cpp | 3 + dbms/src/Server/TCPHandler.cpp | 2 +- 11 files changed, 471 insertions(+), 10 deletions(-) create mode 100644 dbms/include/DB/Interpreters/Quota.h create mode 100644 dbms/src/Interpreters/Quota.cpp diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 584e2e5d332..88b15b3b70f 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -204,6 +204,9 @@ namespace ErrorCodes UNKNOWN_ADDRESS_PATTERN_TYPE, SERVER_REVISION_IS_TOO_OLD, DNS_ERROR, + UNKNOWN_QUOTA, + QUOTA_DOESNT_ALLOW_KEYS, + QUOTA_EXPIRED, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 15e076c3a01..89b253a0c43 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -50,6 +51,7 @@ struct ContextShared FormatFactory format_factory; /// Форматы. mutable SharedPtr dictionaries; /// Словари Метрики. Инициализируются лениво. Users users; /// Известные пользователи. + Quotas quotas; /// Известные квоты на использование ресурсов. Logger * log; /// Логгер. mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов. @@ -71,6 +73,7 @@ private: Shared shared; String user; /// Текущий пользователь. + QuotaForIntervals * quota; /// Текущая квота. String current_database; /// Текущая БД. NamesAndTypesList columns; /// Столбцы текущей обрабатываемой таблицы. Settings settings; /// Настройки выполнения запроса. @@ -83,13 +86,17 @@ private: Context * global_context; /// Глобальный контекст или NULL, если его нет. (Возможно, равен this.) public: - Context() : shared(new ContextShared), session_context(NULL), global_context(NULL) {} + Context() : shared(new ContextShared), quota(NULL), session_context(NULL), global_context(NULL) {} String getPath() const; void setPath(const String & path); void initUsersFromConfig(); - void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address); + void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key); + + void initQuotasFromConfig(); + void setQuota(const String & name, const String & quota_key, const Poco::Net::IPAddress & address); + QuotaForIntervals & getQuota(); /// Проверка существования таблицы/БД. database может быть пустой - в этом случае используется текущая БД. bool isTableExist(const String & database_name, const String & table_name) const; diff --git a/dbms/include/DB/Interpreters/Quota.h b/dbms/include/DB/Interpreters/Quota.h new file mode 100644 index 00000000000..dad10596dd0 --- /dev/null +++ b/dbms/include/DB/Interpreters/Quota.h @@ -0,0 +1,151 @@ +#pragma once + +#include +#include + +#include + +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Квота на потребление ресурсов за заданный интервал - часть настроек. + * Используются, чтобы ограничить потребление ресурсов пользователем. + * Квота действует "мягко" - может быть немного превышена, так как проверяется, как правило, на каждый блок. + * Квота не сохраняется при перезапуске сервера. + * Квота распространяется только на один сервер. + */ + +/// Используется как для максимальных значений, так и в качестве счётчика накопившихся значений. +struct QuotaValues +{ + /// Нули в качестве ограничений означают "не ограничено". + size_t queries; /// Количество запросов. + size_t errors; /// Количество запросов с эксепшенами. + size_t result_rows; /// Количество строк, отданных в качестве результата. + size_t read_rows; /// Количество строк, прочитанных из таблиц. + Poco::Timespan execution_time; /// Суммарное время выполнения запросов. + + QuotaValues() + { + clear(); + } + + void clear() + { + memset(this, 0, sizeof(*this)); + } + + void initFromConfig(const String & config_elem); +}; + + +/// Время, округлённое до границы интервала, квота за интервал и накопившиеся за этот интервал значения. +struct QuotaForInterval +{ + time_t rounded_time; + size_t duration; + QuotaValues max; + QuotaValues used; + + QuotaForInterval() : rounded_time() {} + QuotaForInterval(time_t duration_) : duration(duration_) {} + + void initFromConfig(const String & config_elem, time_t duration_); + + /// Увеличить соответствующее значение. + void addQuery(time_t current_time, const String & quota_name); + void addError(time_t current_time, const String & quota_name); + + /// Проверить, не превышена ли квота уже. Если превышена - кидает исключение. + void checkExceeded(time_t current_time, const String & quota_name); + + /// Проверить соответствующее значение. Если превышено - кинуть исключение. Иначе - увеличить его. + void checkAndAddResultRows(time_t current_time, const String & quota_name, size_t ammount); + void checkAndAddReadRows(time_t current_time, const String & quota_name, size_t ammount); + void checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan ammount); + +private: + /// Сбросить счётчик использованных ресурсов, если соответствующий интервал, за который считается квота, прошёл. + void updateTime(time_t current_time); + void check(size_t max_ammount, size_t used_ammount, time_t current_time, const String & quota_name, const char * resource_name); +}; + + +struct Quota; + +/// Длина интервала -> квота и накопившиеся за текущий интервал такой длины значения (например, 3600 -> значения за текущий час). +class QuotaForIntervals +{ +private: + /// При проверке, будем обходить интервалы в порядке, обратном величине - от самого большого до самого маленького. + typedef std::map Container; + Container cont; + + Quota * parent; + +public: + QuotaForIntervals(Quota * parent_) : parent(parent_) {} + + void initFromConfig(const String & config_elem); + + void addQuery(time_t current_time); + void addError(time_t current_time); + + void checkExceeded(time_t current_time); + + void checkAndAddResultRows(time_t current_time, size_t ammount); + void checkAndAddReadRows(time_t current_time, size_t ammount); + void checkAndAddExecutionTime(time_t current_time, Poco::Timespan ammount); +}; + + +/// Ключ квоты -> квоты за интервалы. Если квота не допускает ключей, то накопленные значения хранятся по ключу 0. +struct Quota +{ + typedef std::tr1::unordered_map Container; + + String name; + + /// Максимальные значения из конфига. + QuotaForIntervals max; + /// Максимальные и накопленные значения для разных ключей. + Container quota_for_keys; + Poco::FastMutex mutex; + + bool is_keyed; + bool keyed_by_ip; + + Quota() : max(this), is_keyed(false), keyed_by_ip(false) {} + + void initFromConfig(const String & config_elem, const String & name_); + QuotaForIntervals & get(const String & quota_key, const Poco::Net::IPAddress & ip); +}; + + +class Quotas +{ +private: + /// Имя квоты -> квоты. + typedef std::tr1::unordered_map > Container; + Container cont; + +public: + void initFromConfig(); + QuotaForIntervals & get(const String & name, const String & quota_key, const Poco::Net::IPAddress & ip); +}; + +} diff --git a/dbms/include/DB/Interpreters/Users.h b/dbms/include/DB/Interpreters/Users.h index 5eabdc11baf..d2a936b80dc 100644 --- a/dbms/include/DB/Interpreters/Users.h +++ b/dbms/include/DB/Interpreters/Users.h @@ -242,6 +242,7 @@ struct User String password; String profile; + String quota; AddressPatterns addresses; @@ -252,6 +253,7 @@ struct User password = config.getString(config_elem + ".password"); profile = config.getString(config_elem + ".profile"); + quota = config.getString(config_elem + ".quota"); addresses.addFromConfig(config_elem + ".networks"); } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index fa9b78d75c6..1efdf0e5681 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -33,17 +33,39 @@ void Context::initUsersFromConfig() } -void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address) +void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key) { Poco::ScopedLock lock(shared->mutex); const User & user_props = shared->users.get(name, password, address); setSetting("profile", user_props.profile); + setQuota(name, quota_key, address); user = name; } +void Context::initQuotasFromConfig() +{ + Poco::ScopedLock lock(shared->mutex); + shared->quotas.initFromConfig(); +} + + +void Context::setQuota(const String & name, const String & quota_key, const Poco::Net::IPAddress & address) +{ + Poco::ScopedLock lock(shared->mutex); + quota = &shared->quotas.get(name, quota_key, address); +} + + +QuotaForIntervals & Context::getQuota() +{ + Poco::ScopedLock lock(shared->mutex); + return *quota; +} + + bool Context::isTableExist(const String & database_name, const String & table_name) const { Poco::ScopedLock lock(shared->mutex); diff --git a/dbms/src/Interpreters/Quota.cpp b/dbms/src/Interpreters/Quota.cpp new file mode 100644 index 00000000000..8128bbadd8d --- /dev/null +++ b/dbms/src/Interpreters/Quota.cpp @@ -0,0 +1,237 @@ +#include + + +namespace DB +{ + +void QuotaValues::initFromConfig(const String & config_elem) +{ + Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); + + queries = config.getInt(config_elem + ".queries", 0); + errors = config.getInt(config_elem + ".errors", 0); + result_rows = config.getInt(config_elem + ".result_rows", 0); + read_rows = config.getInt(config_elem + ".read_rows", 0); + execution_time = Poco::Timespan(config.getInt(config_elem + ".execution_time", 0), 0); +} + + +void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_) +{ + rounded_time = 0; + duration = duration_; + max.initFromConfig(config_elem); +} + +void QuotaForInterval::checkExceeded(time_t current_time, const String & quota_name) +{ + updateTime(current_time); + check(max.queries, used.queries, current_time, quota_name, "Queries"); + check(max.errors, used.errors, current_time, quota_name, "Errors"); + check(max.result_rows, used.result_rows, current_time, quota_name, "Total result rows"); + check(max.read_rows, used.read_rows, current_time, quota_name, "Total rows read"); + check(max.execution_time.totalSeconds(), used.execution_time.totalSeconds(), current_time, quota_name, "Total execution time"); + + std::cerr << "Current values for interval " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << ":\n" + << "queries: " << used.queries << "\n" + << "errors: " << used.errors << "\n"; +} + +void QuotaForInterval::addQuery(time_t current_time, const String & quota_name) +{ + ++used.queries; +} + +void QuotaForInterval::addError(time_t current_time, const String & quota_name) +{ + ++used.errors; +} + +void QuotaForInterval::checkAndAddResultRows(time_t current_time, const String & quota_name, size_t ammount) +{ + checkExceeded(current_time, quota_name); + used.result_rows += ammount; +} + +void QuotaForInterval::checkAndAddReadRows(time_t current_time, const String & quota_name, size_t ammount) +{ + checkExceeded(current_time, quota_name); + used.read_rows += ammount; +} + +void QuotaForInterval::checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan ammount) +{ + checkExceeded(current_time, quota_name); + used.execution_time += ammount; +} + +void QuotaForInterval::updateTime(time_t current_time) +{ + if (current_time >= rounded_time + static_cast(duration)) + { + rounded_time = current_time / duration * duration; + used.clear(); + } +} + +void QuotaForInterval::check(size_t max_ammount, size_t used_ammount, time_t current_time, const String & quota_name, const char * resource_name) +{ + if (max_ammount && used_ammount >= max_ammount) + { + std::stringstream message; + message << "Quota '" << quota_name << "' for "; + + if (duration == 3600) + message << "1 hour"; + else if (duration == 60) + message << "1 minute"; + else if (duration % 3600 == 0) + message << (duration / 3600) << " hours"; + else if (duration % 60 == 0) + message << (duration / 60) << " minutes"; + else + message << duration << " seconds"; + + message << " has been expired. " + << resource_name << ": " << used_ammount << ", max: " << max_ammount << ". " + << "Interval will end at " << mysqlxx::DateTime(rounded_time + duration) << "."; + + throw Exception(message.str(), ErrorCodes::QUOTA_EXPIRED); + } +} + + +void QuotaForIntervals::initFromConfig(const String & config_elem) +{ + Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); + + Poco::Util::AbstractConfiguration::Keys config_keys; + config.keys(config_elem, config_keys); + + for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it) + { + if (0 != it->compare(0, strlen("interval"), "interval")) + continue; + + String interval_config_elem = config_elem + "." + *it; + time_t duration = config.getInt(interval_config_elem + ".duration"); + + cont[duration].initFromConfig(interval_config_elem, duration); + } +} + +void QuotaForIntervals::checkExceeded(time_t current_time) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.checkExceeded(current_time, parent->name); +} + +void QuotaForIntervals::addQuery(time_t current_time) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.addQuery(current_time, parent->name); +} + +void QuotaForIntervals::addError(time_t current_time) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.addError(current_time, parent->name); +} + +void QuotaForIntervals::checkAndAddResultRows(time_t current_time, size_t ammount) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.checkAndAddResultRows(current_time, parent->name, ammount); +} + +void QuotaForIntervals::checkAndAddReadRows(time_t current_time, size_t ammount) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.checkAndAddReadRows(current_time, parent->name, ammount); +} + +void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Timespan ammount) +{ + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + it->second.checkAndAddExecutionTime(current_time, parent->name, ammount); +} + + +void Quota::initFromConfig(const String & config_elem, const String & name_) +{ + name = name_; + + Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); + + keyed_by_ip = config.has(config_elem + ".keyed_by_ip"); + is_keyed = keyed_by_ip || config.has(config_elem + ".keyed"); + + max.initFromConfig(config_elem); +} + +QuotaForIntervals & Quota::get(const String & quota_key, const Poco::Net::IPAddress & ip) +{ + if (!quota_key.empty() && (!is_keyed || keyed_by_ip)) + throw Exception("Quota " + name + " doesn't allow client supplied keys.", ErrorCodes::QUOTA_DOESNT_ALLOW_KEYS); + + String quota_key_or_ip = keyed_by_ip ? ip.toString() : quota_key; + UInt64 quota_key_hashed = 0; + + if (!quota_key_or_ip.empty()) + { + union + { + char bytes[16]; + UInt64 u64[2]; + }; + + SipHash hash; + hash.update(quota_key_or_ip.data(), quota_key_or_ip.size()); + hash.final(bytes); + + quota_key_hashed = u64[0] ^ u64[1]; + } + + Poco::ScopedLock lock(mutex); + + Container::iterator it = quota_for_keys.find(quota_key_hashed); + if (quota_for_keys.end() == it) + { + it = quota_for_keys.insert(std::make_pair(quota_key_hashed, QuotaForIntervals(this))).first; + it->second = max; + } + + return it->second; +} + + +void Quotas::initFromConfig() +{ + Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); + + Poco::Util::AbstractConfiguration::Keys config_keys; + config.keys("quotas", config_keys); + + for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it) + { + cont[*it] = new Quota(); + cont[*it]->initFromConfig("quotas." + *it, *it); + } +} + +QuotaForIntervals & Quotas::get(const String & name, const String & quota_key, const Poco::Net::IPAddress & ip) +{ + Container::iterator it = cont.find(name); + if (cont.end() == it) + throw Exception("Unknown quota " + name, ErrorCodes::UNKNOWN_QUOTA); + + return it->second->get(quota_key, ip); +} + +} diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 9ea946aa469..b8ba058c7d7 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -71,8 +71,23 @@ void executeQuery( /// Проверка ограничений. checkLimits(*ast, context.getSettingsRef().limits); - InterpreterQuery interpreter(ast, context, stage); - interpreter.execute(ostr, &istr, query_plan); + QuotaForIntervals & quota = context.getQuota(); + time_t current_time = time(0); + + quota.checkExceeded(current_time); + + try + { + InterpreterQuery interpreter(ast, context, stage); + interpreter.execute(ostr, &istr, query_plan); + } + catch (...) + { + quota.addError(current_time); + throw; + } + + quota.addQuery(current_time); } @@ -105,8 +120,26 @@ BlockIO executeQuery( /// Проверка ограничений. checkLimits(*ast, context.getSettingsRef().limits); - InterpreterQuery interpreter(ast, context, stage); - return interpreter.execute(); + QuotaForIntervals & quota = context.getQuota(); + time_t current_time = time(0); + + quota.checkExceeded(current_time); + + BlockIO res; + + try + { + InterpreterQuery interpreter(ast, context, stage); + res = interpreter.execute(); + } + catch (...) + { + quota.addError(current_time); + throw; + } + + quota.addQuery(current_time); + return res; } diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index e6c46554fa3..fd98d9e1ab4 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -77,11 +77,13 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net user = credentials.getUsername(); password = credentials.getPassword(); } + + std::string quota_key = params.get("quota_key", ""); Context context = server.global_context; context.setGlobalContext(server.global_context); - context.setUser(user, password, request.clientAddress().host()); + context.setUser(user, password, request.clientAddress().host(), quota_key); /// Настройки могут быть переопределены в запросе. for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it) diff --git a/dbms/src/Server/OLAPHTTPHandler.cpp b/dbms/src/Server/OLAPHTTPHandler.cpp index 9d77d9b46cb..f9cf1169a29 100644 --- a/dbms/src/Server/OLAPHTTPHandler.cpp +++ b/dbms/src/Server/OLAPHTTPHandler.cpp @@ -36,6 +36,7 @@ namespace DB /// Имя пользователя и пароль могут быть заданы как в параметрах URL, так и с помощью HTTP Basic authentification (и то, и другое не секъюрно). std::string user = params.get("user", "default"); std::string password = params.get("password", ""); + std::string quota_key = params.get("quota_key", ""); if (request.hasCredentials()) { @@ -48,7 +49,7 @@ namespace DB Context context = server.global_context; context.setGlobalContext(server.global_context); - context.setUser(user, password, request.clientAddress().host()); + context.setUser(user, password, request.clientAddress().host(), quota_key); OLAP::QueryParseResult olap_query = server.olap_parser->parse(request_istream); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 19e715862a8..5d69db0161c 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -98,6 +98,9 @@ int Server::main(const std::vector & args) /// Загружаем пользователей. global_context.initUsersFromConfig(); + /// Загружаем квоты. + global_context.initQuotasFromConfig(); + /// Загружаем настройки. Settings & settings = global_context.getSettingsRef(); settings.setProfile(config.getString("default_profile", "default")); diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 59ed53996dd..0038744b008 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -321,7 +321,7 @@ void TCPHandler::receiveHello() << (!user.empty() ? ", user: " + user : "") << "."); - connection_context.setUser(user, password, socket().peerAddress().host()); + connection_context.setUser(user, password, socket().peerAddress().host(), ""); }