ClickHouse/dbms/src/Interpreters/executeQuery.cpp

170 lines
4.9 KiB
C++
Raw Normal View History

#include <DB/Common/ProfileEvents.h>
2011-11-06 20:47:07 +00:00
#include <DB/Parsers/formatAST.h>
2012-03-11 08:52:56 +00:00
#include <DB/DataStreams/BlockIO.h>
2015-03-02 01:10:58 +00:00
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
2015-04-14 02:45:30 +00:00
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/parseQuery.h>
2015-04-16 06:12:35 +00:00
#include <DB/Interpreters/Quota.h>
2011-10-30 11:30:52 +00:00
#include <DB/Interpreters/executeQuery.h>
namespace DB
{
static void checkLimits(const IAST & ast, const Limits & limits)
{
if (limits.max_ast_depth)
ast.checkDepth(limits.max_ast_depth);
if (limits.max_ast_elements)
ast.checkSize(limits.max_ast_elements);
}
2011-10-30 11:30:52 +00:00
void executeQuery(
ReadBuffer & istr,
WriteBuffer & ostr,
Context & context,
2012-05-22 18:32:45 +00:00
BlockInputStreamPtr & query_plan,
bool internal,
2012-05-22 18:32:45 +00:00
QueryProcessingStage::Enum stage)
2011-10-30 11:30:52 +00:00
{
ProfileEvents::increment(ProfileEvents::Query);
2012-03-26 04:17:17 +00:00
ParserQuery parser;
2011-10-30 11:30:52 +00:00
PODArray<char> parse_buf;
2011-10-30 11:30:52 +00:00
const char * begin;
const char * end;
/// Если в istr ещё ничего нет, то считываем кусок данных
if (istr.buffer().size() == 0)
istr.next();
2012-08-02 17:33:31 +00:00
size_t max_query_size = context.getSettings().max_query_size;
if (istr.buffer().end() - istr.position() >= static_cast<ssize_t>(max_query_size))
2011-10-30 11:30:52 +00:00
{
/// Если оставшийся размер буфера istr достаточен, чтобы распарсить запрос до max_query_size, то парсим прямо в нём
begin = istr.position();
end = istr.buffer().end();
istr.position() += end - begin;
}
else
{
/// Если нет - считываем достаточное количество данных в parse_buf
2012-08-02 17:33:31 +00:00
parse_buf.resize(max_query_size);
parse_buf.resize(istr.read(&parse_buf[0], max_query_size));
2011-10-30 11:30:52 +00:00
begin = &parse_buf[0];
end = begin + parse_buf.size();
}
ASTPtr ast = parseQuery(parser, begin, end, "");
2011-10-30 11:30:52 +00:00
/// Засунем запрос в строку. Она выводится в лог и в processlist. Если запрос INSERT, то не будем включать данные для вставки.
size_t query_size = ast->range.second - ast->range.first;
if (query_size > max_query_size)
throw Exception("Query is too large (" + toString(query_size) + ")."
" max_query_size = " + toString(max_query_size), ErrorCodes::QUERY_IS_TOO_LARGE);
String query(begin, query_size);
String logged_query = query;
std::replace(logged_query.begin(), logged_query.end(), '\n', ' ');
LOG_DEBUG(&Logger::get("executeQuery"), logged_query);
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
context.getSettingsRef().limits.max_memory_usage,
context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(),
context.getSettingsRef().replace_running_query);
context.setProcessListElement(&process_list_entry->get());
}
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
try
{
InterpreterQuery interpreter(ast, context, stage);
interpreter.execute(ostr, &istr, query_plan);
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
2011-10-30 11:30:52 +00:00
}
2012-03-11 08:52:56 +00:00
BlockIO executeQuery(
const String & query,
2012-05-22 18:32:45 +00:00
Context & context,
bool internal,
2012-05-22 18:32:45 +00:00
QueryProcessingStage::Enum stage)
2012-03-11 08:52:56 +00:00
{
ProfileEvents::increment(ProfileEvents::Query);
2012-03-26 04:17:17 +00:00
ParserQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "");
2012-03-11 08:52:56 +00:00
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
BlockIO res;
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
context.getSettingsRef().limits.max_memory_usage,
context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(),
context.getSettingsRef().replace_running_query);
context.setProcessListElement(&process_list_entry->get());
}
try
{
InterpreterQuery interpreter(ast, context, stage);
res = interpreter.execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
return res;
2012-03-11 08:52:56 +00:00
}
2011-10-30 11:30:52 +00:00
}