From c10d3a4bb98a37e459e8f63ebf909a0ff3a5f60a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 28 Aug 2013 20:47:22 +0000 Subject: [PATCH] dbms: Quota: development [#CONV-8459]. --- .../DataStreams/IProfilingBlockInputStream.h | 22 ++++- dbms/include/DB/Interpreters/Quota.h | 24 ++++-- .../Interpreters/InterpreterSelectQuery.cpp | 12 ++- dbms/src/Interpreters/Quota.cpp | 80 ++++++++++--------- dbms/src/Server/HTTPHandler.cpp | 4 + dbms/src/Server/OLAPHTTPHandler.cpp | 4 + dbms/src/Server/TCPHandler.cpp | 4 + 7 files changed, 102 insertions(+), 48 deletions(-) diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index 4031bec0d19..24e85776806 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -68,7 +69,8 @@ private: class IProfilingBlockInputStream : public IBlockInputStream { public: - IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) : IBlockInputStream(owned_storage_), is_cancelled(false) {} + IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) + : IBlockInputStream(owned_storage_), is_cancelled(false), quota(NULL), quota_mode(QUOTA_READ) {} Block read(); @@ -130,6 +132,21 @@ public: limits = limits_; } + + /// Какая квота используется - на объём исходных данных или на объём результата. + enum QuotaMode + { + QUOTA_READ, + QUOTA_RESULT, + }; + + /// Установить квоту. + void setQuota(QuotaForIntervals & quota_, QuotaMode quota_mode_) + { + quota = "a_; + quota_mode = quota_mode_; + } + protected: BlockStreamProfileInfo info; volatile bool is_cancelled; @@ -137,6 +154,9 @@ protected: LocalLimits limits; + QuotaForIntervals * quota; /// Если NULL - квота не используется. + QuotaMode quota_mode; + /// Наследники должны реализовать эту функцию. virtual Block readImpl() = 0; }; diff --git a/dbms/include/DB/Interpreters/Quota.h b/dbms/include/DB/Interpreters/Quota.h index f9f70d10f74..202675c5408 100644 --- a/dbms/include/DB/Interpreters/Quota.h +++ b/dbms/include/DB/Interpreters/Quota.h @@ -75,12 +75,13 @@ struct QuotaForInterval void checkExceeded(time_t current_time, const String & quota_name); /// Проверить соответствующее значение. Если превышено - кинуть исключение. Иначе - увеличить его. - void checkAndAddResultRows(time_t current_time, const String & quota_name, size_t amount); - void checkAndAddResultBytes(time_t current_time, const String & quota_name, size_t amount); - void checkAndAddReadRows(time_t current_time, const String & quota_name, size_t amount); - void checkAndAddReadBytes(time_t current_time, const String & quota_name, size_t amount); + void checkAndAddResultRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes); + void checkAndAddReadRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes); void checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan amount); + /// Получить текст, описывающий, какая часть квоты израсходована. + String toString() const; + private: /// Сбросить счётчик использованных ресурсов, если соответствующий интервал, за который считается квота, прошёл. void updateTime(time_t current_time); @@ -102,6 +103,12 @@ private: public: QuotaForIntervals(Quota * parent_) : parent(parent_) {} + + /// Есть ли хотя бы один интервал, за который считается квота? + bool empty() const + { + return cont.empty(); + } void initFromConfig(const String & config_elem); @@ -110,11 +117,12 @@ public: void checkExceeded(time_t current_time); - void checkAndAddResultRows(time_t current_time, size_t amount); - void checkAndAddResultBytes(time_t current_time, size_t amount); - void checkAndAddReadRows(time_t current_time, size_t amount); - void checkAndAddReadBytes(time_t current_time, size_t amount); + void checkAndAddResultRowsBytes(time_t current_time, size_t rows, size_t bytes); + void checkAndAddReadRowsBytes(time_t current_time, size_t rows, size_t bytes); void checkAndAddExecutionTime(time_t current_time, Poco::Timespan amount); + + /// Получить текст, описывающий, какая часть квоты израсходована. + String toString() const; }; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 69c24ce2c75..651e7afba10 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -302,7 +302,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() executeUnion(streams); - /// Ограничения на результат, а также колбек для прогресса. + /// Ограничения на результат, квота на результат, а также колбек для прогресса. if (IProfilingBlockInputStream * stream = dynamic_cast(&*streams[0])) { IProfilingBlockInputStream::LocalLimits limits; @@ -311,6 +311,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() limits.read_overflow_mode = settings.limits.result_overflow_mode; stream->setLimits(limits); + stream->setQuota(context.getQuota(), IProfilingBlockInputStream::QUOTA_RESULT); stream->setProgressCallback(context.getProgressCallback()); } @@ -405,7 +406,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu if (streams.size() > settings.max_threads) streams = narrowBlockInputStreams(streams, settings.max_threads); - /** Установка ограничений на чтение данных. + /** Установка ограничений и квоты на чтение данных. * Они устанавливаются на самые "глубокие" чтения. * То есть, не должны устанавливаться для чтений из удалённых серверов и подзапросов. */ @@ -419,10 +420,17 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode; limits.min_execution_speed = settings.limits.min_execution_speed; limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed; + + QuotaForIntervals & quota = context.getQuota(); for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) + { stream->setLimits(limits); + stream->setQuota(quota, IProfilingBlockInputStream::QUOTA_READ); + } + } } return from_stage; diff --git a/dbms/src/Interpreters/Quota.cpp b/dbms/src/Interpreters/Quota.cpp index f48ceb0731f..a5e4078addf 100644 --- a/dbms/src/Interpreters/Quota.cpp +++ b/dbms/src/Interpreters/Quota.cpp @@ -1,3 +1,7 @@ +#include + +#include + #include #include @@ -36,10 +40,23 @@ void QuotaForInterval::checkExceeded(time_t current_time, const String & quota_n check(max.read_rows, used.read_rows, current_time, quota_name, "Total rows read"); check(max.read_bytes, used.read_bytes, current_time, quota_name, "Total bytes read"); check(max.execution_time.totalSeconds(), used.execution_time.totalSeconds(), current_time, quota_name, "Total execution time"); +} - std::cerr << "Current values for interval " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << ":\n" - << "queries: " << used.queries << "\n" - << "errors: " << used.errors << "\n"; +String QuotaForInterval::toString() const +{ + std::stringstream res; + + res << std::fixed << std::setprecision(3) + << "Interval: " << mysqlxx::DateTime(rounded_time) << " - " << mysqlxx::DateTime(rounded_time + duration) << "." + << "Queries: " << used.queries << "." + << "Errors: " << used.errors << "." + << "Result rows: " << used.result_rows << "." + << "Result bytes: " << used.result_bytes << "." + << "Read rows: " << used.read_rows << "." + << "Read bytes: " << used.read_bytes << "." + << "Execution time: " << used.execution_time.totalMilliseconds() / 1000.0 << " seconds."; + + return res.str(); } void QuotaForInterval::addQuery(time_t current_time, const String & quota_name) @@ -52,28 +69,18 @@ void QuotaForInterval::addError(time_t current_time, const String & quota_name) ++used.errors; } -void QuotaForInterval::checkAndAddResultRows(time_t current_time, const String & quota_name, size_t amount) +void QuotaForInterval::checkAndAddResultRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes) { checkExceeded(current_time, quota_name); - used.result_rows += amount; + used.result_rows += rows; + used.result_bytes += bytes; } -void QuotaForInterval::checkAndAddResultBytes(time_t current_time, const String & quota_name, size_t amount) +void QuotaForInterval::checkAndAddReadRowsBytes(time_t current_time, const String & quota_name, size_t rows, size_t bytes) { checkExceeded(current_time, quota_name); - used.result_bytes += amount; -} - -void QuotaForInterval::checkAndAddReadRows(time_t current_time, const String & quota_name, size_t amount) -{ - checkExceeded(current_time, quota_name); - used.read_rows += amount; -} - -void QuotaForInterval::checkAndAddReadBytes(time_t current_time, const String & quota_name, size_t amount) -{ - checkExceeded(current_time, quota_name); - used.read_bytes += amount; + used.read_rows += rows; + used.read_bytes += bytes; } void QuotaForInterval::checkAndAddExecutionTime(time_t current_time, const String & quota_name, Poco::Timespan amount) @@ -158,32 +165,18 @@ void QuotaForIntervals::addError(time_t current_time) it->second.addError(current_time, parent->name); } -void QuotaForIntervals::checkAndAddResultRows(time_t current_time, size_t amount) +void QuotaForIntervals::checkAndAddResultRowsBytes(time_t current_time, size_t rows, size_t bytes) { Poco::ScopedLock lock(parent->mutex); for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) - it->second.checkAndAddResultRows(current_time, parent->name, amount); + it->second.checkAndAddResultRowsBytes(current_time, parent->name, rows, bytes); } -void QuotaForIntervals::checkAndAddResultBytes(time_t current_time, size_t amount) +void QuotaForIntervals::checkAndAddReadRowsBytes(time_t current_time, size_t rows, size_t bytes) { Poco::ScopedLock lock(parent->mutex); for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) - it->second.checkAndAddResultBytes(current_time, parent->name, amount); -} - -void QuotaForIntervals::checkAndAddReadRows(time_t current_time, size_t amount) -{ - Poco::ScopedLock lock(parent->mutex); - for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) - it->second.checkAndAddReadRows(current_time, parent->name, amount); -} - -void QuotaForIntervals::checkAndAddReadBytes(time_t current_time, size_t amount) -{ - Poco::ScopedLock lock(parent->mutex); - for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) - it->second.checkAndAddReadBytes(current_time, parent->name, amount); + it->second.checkAndAddReadRowsBytes(current_time, parent->name, rows, bytes); } void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Timespan amount) @@ -193,6 +186,19 @@ void QuotaForIntervals::checkAndAddExecutionTime(time_t current_time, Poco::Time it->second.checkAndAddExecutionTime(current_time, parent->name, amount); } +String QuotaForIntervals::toString() const +{ + std::stringstream res; + + { + Poco::ScopedLock lock(parent->mutex); + for (Container::reverse_iterator it = cont.rbegin(); it != cont.rend(); ++it) + res << std::endl << it->second.toString() << std::endl; + } + + return res.str(); +} + void Quota::initFromConfig(const String & config_elem, const String & name_) { diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index f649758a374..ada8fb283ba 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -138,6 +138,10 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net << static_cast(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec."); } } + + QuotaForIntervals & quota = context.getQuota(); + if (!quota.empty()) + LOG_INFO(log, "Quota:\n" << quota.toString()); } diff --git a/dbms/src/Server/OLAPHTTPHandler.cpp b/dbms/src/Server/OLAPHTTPHandler.cpp index 44f8e7e6ef8..325086e76f8 100644 --- a/dbms/src/Server/OLAPHTTPHandler.cpp +++ b/dbms/src/Server/OLAPHTTPHandler.cpp @@ -85,6 +85,10 @@ namespace DB << static_cast(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec."); } } + + QuotaForIntervals & quota = context.getQuota(); + if (!quota.empty()) + LOG_INFO(log, "Quota:\n" << quota.toString()); } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 0038744b008..a1e96bc0aac 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -284,6 +284,10 @@ void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in) << "Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., " << static_cast(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec."); } + + QuotaForIntervals & quota = query_context.getQuota(); + if (!quota.empty()) + LOG_INFO(log, "Quota:\n" << quota.toString()); }