diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 88b15b3b70f..4c6138f5ad1 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -207,6 +207,7 @@ namespace ErrorCodes UNKNOWN_QUOTA, QUOTA_DOESNT_ALLOW_KEYS, QUOTA_EXPIRED, + TOO_MUCH_SIMULTANEOUS_QUERIES, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/DataStreams/BlockIO.h b/dbms/include/DB/DataStreams/BlockIO.h index 2135bec294e..461e475e026 100644 --- a/dbms/include/DB/DataStreams/BlockIO.h +++ b/dbms/include/DB/DataStreams/BlockIO.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -14,6 +15,8 @@ struct BlockIO Block in_sample; /// Пример блока, который будет прочитан из in. Block out_sample; /// Пример блока, которого нужно писать в out. + + ProcessList::EntryPtr process_list_entry; }; } diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 2bba44cc7fb..611c2e90f79 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB @@ -54,6 +55,7 @@ struct ContextShared mutable SharedPtr dictionaries; /// Словари Метрики. Инициализируются лениво. Users users; /// Известные пользователи. Quotas quotas; /// Известные квоты на использование ресурсов. + ProcessList process_list; /// Исполняющиеся в данный момент запросы. Logger * log; /// Логгер. ContextShared() : log(&Logger::get("Context")) {}; @@ -164,6 +166,9 @@ public: void setProgressCallback(ProgressCallback callback); /// Используется в InterpreterSelectQuery, чтобы передать его в IProfilingBlockInputStream. ProgressCallback getProgressCallback() const; + + ProcessList & getProcessList() { return shared->process_list; } + const ProcessList & getProcessList() const { return shared->process_list; } }; diff --git a/dbms/include/DB/Interpreters/InterpreterShowProcesslistQuery.h b/dbms/include/DB/Interpreters/InterpreterShowProcesslistQuery.h new file mode 100644 index 00000000000..64916170af2 --- /dev/null +++ b/dbms/include/DB/Interpreters/InterpreterShowProcesslistQuery.h @@ -0,0 +1,57 @@ +#pragma once + +#include + +#include +#include + +#include +#include + + +namespace DB +{ + + +/** Вернуть список запросов, исполняющихся прямо сейчас. + */ +class InterpreterShowProcesslistQuery +{ +public: + InterpreterShowProcesslistQuery(ASTPtr query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) {} + + BlockIO execute() + { + return executeQuery(getRewrittenQuery(), context); + } + + BlockInputStreamPtr executeAndFormat(WriteBuffer & buf) + { + String query = getRewrittenQuery(); + ReadBufferFromString in(query); + BlockInputStreamPtr query_plan; + executeQuery(in, buf, context, query_plan); + return query_plan; + } + +private: + ASTPtr query_ptr; + Context context; + + String getRewrittenQuery() + { + const ASTQueryWithOutput & query = dynamic_cast(*query_ptr); + + std::stringstream rewritten_query; + rewritten_query << "SELECT * FROM system.processes"; + + if (query.format) + rewritten_query << " FORMAT " << dynamic_cast(*query.format).name; + + return rewritten_query.str(); + } +}; + + +} diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h new file mode 100644 index 00000000000..0e7230f59ae --- /dev/null +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -0,0 +1,107 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Список исполняющихся в данный момент запросов. + */ + +class ProcessList +{ + friend class Entry; +public: + /// Запрос и таймер его выполнения. + struct Element + { + std::string query; + Stopwatch watch; + + Element(const std::string & query_) : query(query_) {} + }; + + /// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем. + typedef std::list Containter; + +private: + Containter cont; + size_t cur_size; /// В C++03 std::list::size не O(1). + size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение. + mutable Poco::FastMutex mutex; + + /// Держит итератор на список, и удаляет элемент из списка в деструкторе. + class Entry + { + private: + ProcessList & parent; + Containter::iterator it; + public: + Entry(ProcessList & parent_, Containter::iterator it_) + : parent(parent_), it(it_) {} + + ~Entry() + { + Poco::ScopedLock lock(parent.mutex); + parent.cont.erase(it); + --parent.cur_size; + } + }; + +public: + ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {} + + typedef Poco::SharedPtr EntryPtr; + + /// Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении. + EntryPtr insert(const std::string & query_) + { + EntryPtr res; + + { + Poco::ScopedLock lock(mutex); + + if (max_size && cur_size >= max_size) + throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES); + + ++cur_size; + res = new Entry(*this, cont.insert(cont.end(), Element(query_))); + } + + return res; + } + + /// Количество одновременно выполняющихся запросов. + size_t size() const + { + Poco::ScopedLock lock(mutex); + return cur_size; + } + + /// Получить текущее состояние (копию) списка запросов. + Containter get() const + { + Containter res; + + { + Poco::ScopedLock lock(mutex); + res = cont; + } + + return res; + } + + void setMaxSize(size_t max_size_) + { + Poco::ScopedLock lock(mutex); + max_size = max_size_; + } +}; + +} diff --git a/dbms/include/DB/Parsers/ASTQueryWithOutput.h b/dbms/include/DB/Parsers/ASTQueryWithOutput.h index 552345be7ca..686a64e2ab4 100644 --- a/dbms/include/DB/Parsers/ASTQueryWithOutput.h +++ b/dbms/include/DB/Parsers/ASTQueryWithOutput.h @@ -7,15 +7,38 @@ namespace DB { - /** Запрос с секцией FORMAT. - */ - class ASTQueryWithOutput : public IAST - { - public: - ASTPtr format; - - ASTQueryWithOutput() {} - ASTQueryWithOutput(StringRange range_) : IAST(range_) {} - }; - +/** Запрос с секцией FORMAT. + */ +class ASTQueryWithOutput : public IAST +{ +public: + ASTPtr format; + + ASTQueryWithOutput() {} + ASTQueryWithOutput(StringRange range_) : IAST(range_) {} +}; + + +/// Объявляет класс-наследник ASTQueryWithOutput с реализованными методами getID и clone. +#define DEFINE_AST_QUERY_WITH_OUTPUT(Name, ID) \ +class Name : public ASTQueryWithOutput \ +{ \ +public: \ + Name() {} \ + Name(StringRange range_) : ASTQueryWithOutput(range_) {} \ + String getID() const { return ID; }; \ + \ + ASTPtr clone() const \ + { \ + Name * res = new Name(*this); \ + res->children.clear(); \ + if (format) \ + { \ + res->format = format->clone(); \ + res->children.push_back(res->format); \ + } \ + return res; \ + } \ +}; + } diff --git a/dbms/include/DB/Parsers/ASTShowProcesslistQuery.h b/dbms/include/DB/Parsers/ASTShowProcesslistQuery.h new file mode 100644 index 00000000000..8d06950319e --- /dev/null +++ b/dbms/include/DB/Parsers/ASTShowProcesslistQuery.h @@ -0,0 +1,9 @@ +#pragma once + +#include + + +namespace DB +{ + DEFINE_AST_QUERY_WITH_OUTPUT(ASTShowProcesslistQuery, "ShowProcesslistQuery") +} diff --git a/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h b/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h new file mode 100644 index 00000000000..24290567313 --- /dev/null +++ b/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Запрос SHOW PROCESSLIST + */ +class ParserShowProcesslistQuery : public IParserBase +{ +protected: + String getName() { return "SHOW PROCESSLIST query"; } + + bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected) + { + Pos begin = pos; + + ParserWhiteSpaceOrComments ws; + ParserString s_show("SHOW", true, true); + ParserString s_processlist("PROCESSLIST", true, true); + ParserString s_format("FORMAT", true, true); + + ASTPtr format; + + ws.ignore(pos, end); + + if (!s_show.ignore(pos, end, expected)) + return false; + + ws.ignore(pos, end); + + if (!s_processlist.ignore(pos, end, expected)) + return false; + + ws.ignore(pos, end); + + if (s_format.ignore(pos, end, expected)) + { + ws.ignore(pos, end); + + ParserIdentifier format_p; + + if (!format_p.parse(pos, end, format, expected)) + return false; + dynamic_cast(*format).kind = ASTIdentifier::Format; + + ws.ignore(pos, end); + } + + ASTShowProcesslistQuery * query = new ASTShowProcesslistQuery(StringRange(begin, pos)); + query->format = format; + node = query; + + return true; + } +}; + +} diff --git a/dbms/include/DB/Parsers/formatAST.h b/dbms/include/DB/Parsers/formatAST.h index 057b8482f40..79e453e51d4 100644 --- a/dbms/include/DB/Parsers/formatAST.h +++ b/dbms/include/DB/Parsers/formatAST.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace DB @@ -54,5 +55,6 @@ void formatAST(const ASTOrderByElement & ast, std::ostream & s, size_t indent = void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false); void formatAST(const ASTQueryWithTableAndOutput & ast, std::string name, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false); void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false); +void formatAST(const ASTShowProcesslistQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false); } diff --git a/dbms/include/DB/Storages/StorageSystemProcesses.h b/dbms/include/DB/Storages/StorageSystemProcesses.h new file mode 100644 index 00000000000..3bd4c7023c0 --- /dev/null +++ b/dbms/include/DB/Storages/StorageSystemProcesses.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +#include +#include +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +/** Реализует системную таблицу processes, которая позволяет получить информацию о запросах, исполняющихся в данный момент. + */ +class StorageSystemProcesses : public IStorage +{ +public: + static StoragePtr create(const std::string & name_, const Context & context_); + + std::string getName() const { return "SystemProcesses"; } + std::string getTableName() const { return name; } + + const NamesAndTypesList & getColumnsList() const { return columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1); + +private: + const std::string name; + const Context & context; + NamesAndTypesList columns; + + StorageSystemProcesses(const std::string & name_, const Context & context_); +}; + +} diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 1efdf0e5681..06e2e459937 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -387,4 +387,5 @@ ProgressCallback Context::getProgressCallback() const return progress_callback; } + } diff --git a/dbms/src/Interpreters/InterpreterQuery.cpp b/dbms/src/Interpreters/InterpreterQuery.cpp index 1c7384ca262..b6f044d37c8 100644 --- a/dbms/src/Interpreters/InterpreterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterQuery.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace DB @@ -104,6 +105,11 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i InterpreterDescribeQuery interpreter(query_ptr, context); query_plan = interpreter.executeAndFormat(ostr); } + else if (dynamic_cast(&*query_ptr)) + { + InterpreterShowProcesslistQuery interpreter(query_ptr, context); + query_plan = interpreter.executeAndFormat(ostr); + } else if (dynamic_cast(&*query_ptr)) { throwIfReadOnly(); @@ -187,6 +193,11 @@ BlockIO InterpreterQuery::execute() InterpreterDescribeQuery interpreter(query_ptr, context); res = interpreter.execute(); } + else if (dynamic_cast(&*query_ptr)) + { + InterpreterShowProcesslistQuery interpreter(query_ptr, context); + res = interpreter.execute(); + } else if (dynamic_cast(&*query_ptr)) { throwIfReadOnly(); diff --git a/dbms/src/Interpreters/InterpreterShowTablesQuery.cpp b/dbms/src/Interpreters/InterpreterShowTablesQuery.cpp index a46bbe400e2..67bdf158958 100644 --- a/dbms/src/Interpreters/InterpreterShowTablesQuery.cpp +++ b/dbms/src/Interpreters/InterpreterShowTablesQuery.cpp @@ -21,11 +21,11 @@ InterpreterShowTablesQuery::InterpreterShowTablesQuery(ASTPtr query_ptr_, Contex String InterpreterShowTablesQuery::getRewrittenQuery() { - ASTShowTablesQuery query = dynamic_cast(*query_ptr); + const ASTShowTablesQuery & query = dynamic_cast(*query_ptr); String format_or_nothing; if (query.format) - format_or_nothing = " FORMAT " + dynamic_cast(*query.format).name; + format_or_nothing = " FORMAT " + dynamic_cast(*query.format).name; /// SHOW DATABASES if (query.databases) diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index b8ba058c7d7..96ce29dcd4d 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -66,7 +66,10 @@ void executeQuery( + ", expected " + (parse_res ? "end of query" : expected) + ".", ErrorCodes::SYNTAX_ERROR); - LOG_DEBUG(&Logger::get("executeQuery"), std::string(begin, pos - begin)); + String query(begin, pos - begin); + + LOG_DEBUG(&Logger::get("executeQuery"), query); + ProcessList::EntryPtr process_list_entry = context.getProcessList().insert(query); /// Проверка ограничений. checkLimits(*ast, context.getSettingsRef().limits); @@ -96,6 +99,9 @@ BlockIO executeQuery( Context & context, QueryProcessingStage::Enum stage) { + BlockIO res; + res.process_list_entry = context.getProcessList().insert(query); + ParserQuery parser; ASTPtr ast; std::string expected; @@ -114,9 +120,6 @@ BlockIO executeQuery( + ", expected " + (parse_res ? "end of query" : expected) + ".", ErrorCodes::SYNTAX_ERROR); -// formatAST(*ast, std::cerr); -// std::cerr << std::endl; - /// Проверка ограничений. checkLimits(*ast, context.getSettingsRef().limits); @@ -125,8 +128,6 @@ BlockIO executeQuery( quota.checkExceeded(current_time); - BlockIO res; - try { InterpreterQuery interpreter(ast, context, stage); diff --git a/dbms/src/Parsers/ParserQuery.cpp b/dbms/src/Parsers/ParserQuery.cpp index 16f3a3c80c4..b05d5dd577b 100644 --- a/dbms/src/Parsers/ParserQuery.cpp +++ b/dbms/src/Parsers/ParserQuery.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -30,6 +31,7 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected ParserSetQuery set_p; ParserOptimizeQuery optimize_p; ParserTablePropertiesQuery table_p; + ParserShowProcesslistQuery show_processlist_p; bool res = show_tables_p.parse(pos, end, node, expected) || select_p.parse(pos, end, node, expected) @@ -41,10 +43,11 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected || use_p.parse(pos, end, node, expected) || set_p.parse(pos, end, node, expected) || optimize_p.parse(pos, end, node, expected) - || table_p.parse(pos, end, node, expected); + || table_p.parse(pos, end, node, expected) + || show_processlist_p.parse(pos, end, node, expected); if (!res) - expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC, ALTER"; + expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC, ALTER, SHOW PROCESSLIST"; return res; } diff --git a/dbms/src/Parsers/formatAST.cpp b/dbms/src/Parsers/formatAST.cpp index 7ceabde6451..d7ae39c51f0 100644 --- a/dbms/src/Parsers/formatAST.cpp +++ b/dbms/src/Parsers/formatAST.cpp @@ -66,6 +66,7 @@ void formatAST(const IAST & ast, std::ostream & s, size_t indent, bool hilite, b DISPATCH(OrderByElement) DISPATCH(Subquery) DISPATCH(AlterQuery) + DISPATCH(ShowProcesslistQuery) else throw DB::Exception("Unknown element in AST: " + ast.getID() + " '" + std::string(ast.range.first, ast.range.second - ast.range.first) + "'", ErrorCodes::UNKNOWN_ELEMENT_IN_AST); @@ -331,6 +332,12 @@ void formatAST(const ASTUseQuery & ast, std::ostream & s, size_t indent, bool return; } +void formatAST(const ASTShowProcesslistQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line) +{ + s << (hilite ? hilite_keyword : "") << "SHOW PROCESSLIST" << (hilite ? hilite_none : ""); + return; +} + void formatAST(const ASTInsertQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line) { s << (hilite ? hilite_keyword : "") << "INSERT INTO " << (hilite ? hilite_none : "") diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 01eb2b79bdc..354fccfe120 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "Server.h" @@ -101,6 +102,9 @@ int Server::main(const std::vector & args) /// Загружаем квоты. global_context.initQuotasFromConfig(); + /// Максимальное количество одновременно выполняющихся запросов. + global_context.getProcessList().setMaxSize(config.getInt("max_concurrent_queries", 0)); + /// Загружаем настройки. Settings & settings = global_context.getSettingsRef(); settings.setProfile(config.getString("default_profile", "default")); @@ -116,6 +120,7 @@ int Server::main(const std::vector & args) global_context.addTable("system", "numbers", StorageSystemNumbers::create("numbers")); global_context.addTable("system", "tables", StorageSystemTables::create("tables", global_context)); global_context.addTable("system", "databases", StorageSystemDatabases::create("databases", global_context)); + global_context.addTable("system", "processes", StorageSystemProcesses::create("processes", global_context)); global_context.setCurrentDatabase(config.getString("default_database", "default")); diff --git a/dbms/src/Storages/StorageSystemProcesses.cpp b/dbms/src/Storages/StorageSystemProcesses.cpp new file mode 100644 index 00000000000..dbe4e52f54e --- /dev/null +++ b/dbms/src/Storages/StorageSystemProcesses.cpp @@ -0,0 +1,58 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + + +StorageSystemProcesses::StorageSystemProcesses(const std::string & name_, const Context & context_) + : name(name_), context(context_) +{ + columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64)); + columns.push_back(NameAndTypePair("query", new DataTypeString)); +} + +StoragePtr StorageSystemProcesses::create(const std::string & name_, const Context & context_) +{ + return (new StorageSystemProcesses(name_, context_))->thisPtr(); +} + + +BlockInputStreams StorageSystemProcesses::read( + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + Block block; + + ColumnWithNameAndType col_elapsed; + col_elapsed.name = "elapsed"; + col_elapsed.type = new DataTypeFloat64; + col_elapsed.column = new ColumnFloat64; + block.insert(col_elapsed); + + ColumnWithNameAndType col_query; + col_query.name = "query"; + col_query.type = new DataTypeString; + col_query.column = new ColumnString; + block.insert(col_query); + + ProcessList::Containter list = context.getProcessList().get(); + + for (ProcessList::Containter::const_iterator it = list.begin(); it != list.end(); ++it) + { + col_elapsed.column->insert(it->watch.elapsedSeconds()); + col_query.column->insert(it->query); + } + + return BlockInputStreams(1, new OneBlockInputStream(block)); +} + + +}