dbms: added SHOW PROCESSLIST query and 'max_concurrent_queries' setting [#CONV-8665] [#CONV-8664].

This commit is contained in:
Alexey Milovidov 2013-09-03 20:21:28 +00:00
parent 62c12dded7
commit d055d54380
18 changed files with 422 additions and 21 deletions

View File

@ -207,6 +207,7 @@ namespace ErrorCodes
UNKNOWN_QUOTA,
QUOTA_DOESNT_ALLOW_KEYS,
QUOTA_EXPIRED,
TOO_MUCH_SIMULTANEOUS_QUERIES,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -2,6 +2,7 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/ProcessList.h>
namespace DB
@ -14,6 +15,8 @@ struct BlockIO
Block in_sample; /// Пример блока, который будет прочитан из in.
Block out_sample; /// Пример блока, которого нужно писать в out.
ProcessList::EntryPtr process_list_entry;
};
}

View File

@ -19,6 +19,7 @@
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ProcessList.h>
namespace DB
@ -54,6 +55,7 @@ struct ContextShared
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
ProcessList process_list; /// Исполняющиеся в данный момент запросы.
Logger * log; /// Логгер.
ContextShared() : log(&Logger::get("Context")) {};
@ -164,6 +166,9 @@ public:
void setProgressCallback(ProgressCallback callback);
/// Используется в InterpreterSelectQuery, чтобы передать его в IProfilingBlockInputStream.
ProgressCallback getProgressCallback() const;
ProcessList & getProcessList() { return shared->process_list; }
const ProcessList & getProcessList() const { return shared->process_list; }
};

View File

@ -0,0 +1,57 @@
#pragma once
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Storages/StorageSystemProcesses.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Parsers/ASTQueryWithOutput.h>
#include <DB/Parsers/ASTIdentifier.h>
namespace DB
{
/** Вернуть список запросов, исполняющихся прямо сейчас.
*/
class InterpreterShowProcesslistQuery
{
public:
InterpreterShowProcesslistQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute()
{
return executeQuery(getRewrittenQuery(), context);
}
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf)
{
String query = getRewrittenQuery();
ReadBufferFromString in(query);
BlockInputStreamPtr query_plan;
executeQuery(in, buf, context, query_plan);
return query_plan;
}
private:
ASTPtr query_ptr;
Context context;
String getRewrittenQuery()
{
const ASTQueryWithOutput & query = dynamic_cast<const ASTQueryWithOutput &>(*query_ptr);
std::stringstream rewritten_query;
rewritten_query << "SELECT * FROM system.processes";
if (query.format)
rewritten_query << " FORMAT " << dynamic_cast<const ASTIdentifier &>(*query.format).name;
return rewritten_query.str();
}
};
}

View File

@ -0,0 +1,107 @@
#pragma once
#include <list>
#include <Poco/SharedPtr.h>
#include <statdaemons/Stopwatch.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
/** Список исполняющихся в данный момент запросов.
*/
class ProcessList
{
friend class Entry;
public:
/// Запрос и таймер его выполнения.
struct Element
{
std::string query;
Stopwatch watch;
Element(const std::string & query_) : query(query_) {}
};
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
typedef std::list<Element> Containter;
private:
Containter cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
mutable Poco::FastMutex mutex;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class Entry
{
private:
ProcessList & parent;
Containter::iterator it;
public:
Entry(ProcessList & parent_, Containter::iterator it_)
: parent(parent_), it(it_) {}
~Entry()
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
parent.cont.erase(it);
--parent.cur_size;
}
};
public:
ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {}
typedef Poco::SharedPtr<Entry> EntryPtr;
/// Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
EntryPtr insert(const std::string & query_)
{
EntryPtr res;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (max_size && cur_size >= max_size)
throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES);
++cur_size;
res = new Entry(*this, cont.insert(cont.end(), Element(query_)));
}
return res;
}
/// Количество одновременно выполняющихся запросов.
size_t size() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cur_size;
}
/// Получить текущее состояние (копию) списка запросов.
Containter get() const
{
Containter res;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
res = cont;
}
return res;
}
void setMaxSize(size_t max_size_)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
max_size = max_size_;
}
};
}

View File

@ -7,15 +7,38 @@ namespace DB
{
/** Запрос с секцией FORMAT.
*/
class ASTQueryWithOutput : public IAST
{
public:
ASTPtr format;
ASTQueryWithOutput() {}
ASTQueryWithOutput(StringRange range_) : IAST(range_) {}
};
/** Запрос с секцией FORMAT.
*/
class ASTQueryWithOutput : public IAST
{
public:
ASTPtr format;
ASTQueryWithOutput() {}
ASTQueryWithOutput(StringRange range_) : IAST(range_) {}
};
/// Объявляет класс-наследник ASTQueryWithOutput с реализованными методами getID и clone.
#define DEFINE_AST_QUERY_WITH_OUTPUT(Name, ID) \
class Name : public ASTQueryWithOutput \
{ \
public: \
Name() {} \
Name(StringRange range_) : ASTQueryWithOutput(range_) {} \
String getID() const { return ID; }; \
\
ASTPtr clone() const \
{ \
Name * res = new Name(*this); \
res->children.clear(); \
if (format) \
{ \
res->format = format->clone(); \
res->children.push_back(res->format); \
} \
return res; \
} \
};
}

View File

@ -0,0 +1,9 @@
#pragma once
#include <DB/Parsers/ASTQueryWithOutput.h>
namespace DB
{
DEFINE_AST_QUERY_WITH_OUTPUT(ASTShowProcesslistQuery, "ShowProcesslistQuery")
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
namespace DB
{
/** Запрос SHOW PROCESSLIST
*/
class ParserShowProcesslistQuery : public IParserBase
{
protected:
String getName() { return "SHOW PROCESSLIST query"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected)
{
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserString s_show("SHOW", true, true);
ParserString s_processlist("PROCESSLIST", true, true);
ParserString s_format("FORMAT", true, true);
ASTPtr format;
ws.ignore(pos, end);
if (!s_show.ignore(pos, end, expected))
return false;
ws.ignore(pos, end);
if (!s_processlist.ignore(pos, end, expected))
return false;
ws.ignore(pos, end);
if (s_format.ignore(pos, end, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, format, expected))
return false;
dynamic_cast<ASTIdentifier &>(*format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
ASTShowProcesslistQuery * query = new ASTShowProcesslistQuery(StringRange(begin, pos));
query->format = format;
node = query;
return true;
}
};
}

View File

@ -22,6 +22,7 @@
#include <DB/Parsers/ASTOrderByElement.h>
#include <DB/Parsers/ASTSubquery.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
namespace DB
@ -54,5 +55,6 @@ void formatAST(const ASTOrderByElement & ast, std::ostream & s, size_t indent =
void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTQueryWithTableAndOutput & ast, std::string name, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTShowProcesslistQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/Context.h>
namespace DB
{
using Poco::SharedPtr;
/** Реализует системную таблицу processes, которая позволяет получить информацию о запросах, исполняющихся в данный момент.
*/
class StorageSystemProcesses : public IStorage
{
public:
static StoragePtr create(const std::string & name_, const Context & context_);
std::string getName() const { return "SystemProcesses"; }
std::string getTableName() const { return name; }
const NamesAndTypesList & getColumnsList() const { return columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
private:
const std::string name;
const Context & context;
NamesAndTypesList columns;
StorageSystemProcesses(const std::string & name_, const Context & context_);
};
}

View File

@ -387,4 +387,5 @@ ProgressCallback Context::getProgressCallback() const
return progress_callback;
}
}

View File

@ -24,6 +24,7 @@
#include <DB/Interpreters/InterpreterShowCreateQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/InterpreterShowProcesslistQuery.h>
namespace DB
@ -104,6 +105,11 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i
InterpreterDescribeQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTShowProcesslistQuery *>(&*query_ptr))
{
InterpreterShowProcesslistQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();
@ -187,6 +193,11 @@ BlockIO InterpreterQuery::execute()
InterpreterDescribeQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (dynamic_cast<ASTShowProcesslistQuery *>(&*query_ptr))
{
InterpreterShowProcesslistQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (dynamic_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();

View File

@ -21,11 +21,11 @@ InterpreterShowTablesQuery::InterpreterShowTablesQuery(ASTPtr query_ptr_, Contex
String InterpreterShowTablesQuery::getRewrittenQuery()
{
ASTShowTablesQuery query = dynamic_cast<const ASTShowTablesQuery &>(*query_ptr);
const ASTShowTablesQuery & query = dynamic_cast<const ASTShowTablesQuery &>(*query_ptr);
String format_or_nothing;
if (query.format)
format_or_nothing = " FORMAT " + dynamic_cast<ASTIdentifier &>(*query.format).name;
format_or_nothing = " FORMAT " + dynamic_cast<const ASTIdentifier &>(*query.format).name;
/// SHOW DATABASES
if (query.databases)

View File

@ -66,7 +66,10 @@ void executeQuery(
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
ErrorCodes::SYNTAX_ERROR);
LOG_DEBUG(&Logger::get("executeQuery"), std::string(begin, pos - begin));
String query(begin, pos - begin);
LOG_DEBUG(&Logger::get("executeQuery"), query);
ProcessList::EntryPtr process_list_entry = context.getProcessList().insert(query);
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
@ -96,6 +99,9 @@ BlockIO executeQuery(
Context & context,
QueryProcessingStage::Enum stage)
{
BlockIO res;
res.process_list_entry = context.getProcessList().insert(query);
ParserQuery parser;
ASTPtr ast;
std::string expected;
@ -114,9 +120,6 @@ BlockIO executeQuery(
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
ErrorCodes::SYNTAX_ERROR);
// formatAST(*ast, std::cerr);
// std::cerr << std::endl;
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
@ -125,8 +128,6 @@ BlockIO executeQuery(
quota.checkExceeded(current_time);
BlockIO res;
try
{
InterpreterQuery interpreter(ast, context, stage);

View File

@ -11,6 +11,7 @@
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/ParserTablePropertiesQuery.h>
#include <DB/Parsers/ParserAlterQuery.h>
#include <DB/Parsers/ParserShowProcesslistQuery.h>
namespace DB
@ -30,6 +31,7 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected
ParserSetQuery set_p;
ParserOptimizeQuery optimize_p;
ParserTablePropertiesQuery table_p;
ParserShowProcesslistQuery show_processlist_p;
bool res = show_tables_p.parse(pos, end, node, expected)
|| select_p.parse(pos, end, node, expected)
@ -41,10 +43,11 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected
|| use_p.parse(pos, end, node, expected)
|| set_p.parse(pos, end, node, expected)
|| optimize_p.parse(pos, end, node, expected)
|| table_p.parse(pos, end, node, expected);
|| table_p.parse(pos, end, node, expected)
|| show_processlist_p.parse(pos, end, node, expected);
if (!res)
expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC, ALTER";
expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC, ALTER, SHOW PROCESSLIST";
return res;
}

View File

@ -66,6 +66,7 @@ void formatAST(const IAST & ast, std::ostream & s, size_t indent, bool hilite, b
DISPATCH(OrderByElement)
DISPATCH(Subquery)
DISPATCH(AlterQuery)
DISPATCH(ShowProcesslistQuery)
else
throw DB::Exception("Unknown element in AST: " + ast.getID() + " '" + std::string(ast.range.first, ast.range.second - ast.range.first) + "'",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -331,6 +332,12 @@ void formatAST(const ASTUseQuery & ast, std::ostream & s, size_t indent, bool
return;
}
void formatAST(const ASTShowProcesslistQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line)
{
s << (hilite ? hilite_keyword : "") << "SHOW PROCESSLIST" << (hilite ? hilite_none : "");
return;
}
void formatAST(const ASTInsertQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line)
{
s << (hilite ? hilite_keyword : "") << "INSERT INTO " << (hilite ? hilite_none : "")

View File

@ -6,6 +6,7 @@
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemDatabases.h>
#include <DB/Storages/StorageSystemProcesses.h>
#include <DB/Storages/StorageSystemOne.h>
#include "Server.h"
@ -101,6 +102,9 @@ int Server::main(const std::vector<std::string> & args)
/// Загружаем квоты.
global_context.initQuotasFromConfig();
/// Максимальное количество одновременно выполняющихся запросов.
global_context.getProcessList().setMaxSize(config.getInt("max_concurrent_queries", 0));
/// Загружаем настройки.
Settings & settings = global_context.getSettingsRef();
settings.setProfile(config.getString("default_profile", "default"));
@ -116,6 +120,7 @@ int Server::main(const std::vector<std::string> & args)
global_context.addTable("system", "numbers", StorageSystemNumbers::create("numbers"));
global_context.addTable("system", "tables", StorageSystemTables::create("tables", global_context));
global_context.addTable("system", "databases", StorageSystemDatabases::create("databases", global_context));
global_context.addTable("system", "processes", StorageSystemProcesses::create("processes", global_context));
global_context.setCurrentDatabase(config.getString("default_database", "default"));

View File

@ -0,0 +1,58 @@
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/StorageSystemProcesses.h>
namespace DB
{
StorageSystemProcesses::StorageSystemProcesses(const std::string & name_, const Context & context_)
: name(name_), context(context_)
{
columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64));
columns.push_back(NameAndTypePair("query", new DataTypeString));
}
StoragePtr StorageSystemProcesses::create(const std::string & name_, const Context & context_)
{
return (new StorageSystemProcesses(name_, context_))->thisPtr();
}
BlockInputStreams StorageSystemProcesses::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithNameAndType col_elapsed;
col_elapsed.name = "elapsed";
col_elapsed.type = new DataTypeFloat64;
col_elapsed.column = new ColumnFloat64;
block.insert(col_elapsed);
ColumnWithNameAndType col_query;
col_query.name = "query";
col_query.type = new DataTypeString;
col_query.column = new ColumnString;
block.insert(col_query);
ProcessList::Containter list = context.getProcessList().get();
for (ProcessList::Containter::const_iterator it = list.begin(); it != list.end(); ++it)
{
col_elapsed.column->insert(it->watch.elapsedSeconds());
col_query.column->insert(it->query);
}
return BlockInputStreams(1, new OneBlockInputStream(block));
}
}