dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-10-30 11:30:52 +00:00
parent bdf0aee3c6
commit 69ce664c9c
10 changed files with 411 additions and 0 deletions

View File

@ -0,0 +1,32 @@
#pragma once
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Интерпретирует запрос INSERT.
*/
class InterpreterInsertQuery
{
public:
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/** Выполнить запрос.
* remaining_data_istr, если не NULL, может содержать нераспарсенные данные для вставки.
* (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные)
*/
void execute(SharedPtr<ReadBuffer> remaining_data_istr);
private:
StoragePtr getTable();
ASTPtr query_ptr;
Context context;
size_t max_block_size;
};
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Интерпретирует произвольный запрос.
*/
class InterpreterQuery
{
public:
InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/** Выполнить запрос.
*
* ostr - куда писать результат выполнения запроса, если он есть.
*
* remaining_data_istr, если не NULL, может содержать нераспарсенный остаток запроса с данными.
* (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные)
*
* В query_plan,
* после выполнения запроса, может быть записан BlockInputStreamPtr,
* использовавшийся при выполнении запроса,
* чтобы можно было получить информацию о том, как выполнялся запрос.
*/
void execute(WriteBuffer & ostr, SharedPtr<ReadBuffer> remaining_data_istr, BlockInputStreamPtr & query_plan);
private:
ASTPtr query_ptr;
Context context;
size_t max_block_size;
};
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <DB/Parsers/ParserQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
namespace DB
{
/** Парсит и исполняет запрос.
*/
void executeQuery(
ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть)
WriteBuffer & ostr, /// Куда писать результат
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос
size_t max_query_size = DEFAULT_MAX_QUERY_SIZE, /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже)
size_t max_block_size = DEFAULT_BLOCK_SIZE); /// Максимальный размер блока при чтении или вставке данных
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Загружает определения таблиц и добавляет их в контекст.
*/
void loadMetadata(Context & context);
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
namespace DB
{
class ParserQuery : public IParserBase
{
protected:
String getName() { return "Query"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected);
};
}

View File

@ -0,0 +1,87 @@
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
namespace DB
{
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
{
}
StoragePtr InterpreterInsertQuery::getTable()
{
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
/// В какую таблицу писать.
String database_name = query.database;
String table_name = query.table;
/** Если база данных не указана - используем текущую базу данных.
*/
if (database_name.empty())
database_name = context.current_database;
if (context.databases->end() == context.databases->find(database_name)
|| (*context.databases)[database_name].end() == (*context.databases)[database_name].find(table_name))
throw Exception("Unknown table '" + table_name + "' in database '" + database_name + "'", ErrorCodes::UNKNOWN_TABLE);
return (*context.databases)[database_name][table_name];
}
void InterpreterInsertQuery::execute(SharedPtr<ReadBuffer> remaining_data_istr)
{
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
StoragePtr table = getTable();
/// TODO - если указаны не все столбцы, то дополнить поток недостающими столбцами со значениями по-умолчанию.
BlockInputStreamPtr in;
BlockOutputStreamPtr out = table->write(query_ptr);
/// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT?
if (!query.select)
{
FormatFactory format_factory;
String format = query.format;
if (format.empty())
format = "Values";
/// Данные могут содержаться в распарсенной и ещё не распарсенной части запроса.
ConcatReadBuffer::ReadBuffers buffers;
ReadBuffer buf1(const_cast<char *>(query.data), query.end - query.data, 0);
buffers.push_back(&buf1);
if (remaining_data_istr)
buffers.push_back(&*remaining_data_istr);
ConcatReadBuffer istr(buffers);
Block sample = table->getSampleBlock();
in = format_factory.getInput(format, istr, sample, max_block_size, *context.data_type_factory);
copyData(*in, *out);
}
else
{
InterpreterSelectQuery interpreter_select(query.select, context, max_block_size);
in = interpreter_select.execute();
copyData(*in, *out);
}
}
}

View File

@ -0,0 +1,43 @@
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
namespace DB
{
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
{
}
void InterpreterQuery::execute(WriteBuffer & ostr, SharedPtr<ReadBuffer> remaining_data_istr, BlockInputStreamPtr & query_plan)
{
if (dynamic_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context, max_block_size);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTInsertQuery *>(&*query_ptr))
{
InterpreterInsertQuery interpreter(query_ptr, context, max_block_size);
interpreter.execute(remaining_data_istr);
}
else if (dynamic_cast<ASTCreateQuery *>(&*query_ptr))
{
InterpreterCreateQuery interpreter;
interpreter.execute(query_ptr, context);
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
}

View File

@ -0,0 +1,61 @@
#include <DB/Interpreters/executeQuery.h>
namespace DB
{
void executeQuery(
ReadBuffer & istr,
WriteBuffer & ostr,
Context & context,
BlockInputStreamPtr & query_plan,
size_t max_query_size,
size_t max_block_size)
{
DB::ParserQuery parser;
DB::ASTPtr ast;
std::string expected;
std::vector<char> parse_buf;
const char * begin;
const char * end;
/// Если в istr ещё ничего нет, то считываем кусок данных
if (istr.buffer().size() == 0)
istr.next();
if (istr.buffer().end() - istr.position() >= static_cast<ssize_t>(max_query_size))
{
/// Если оставшийся размер буфера istr достаточен, чтобы распарсить запрос до max_query_size, то парсим прямо в нём
begin = istr.position();
end = istr.buffer().end();
istr.position() += end - begin;
}
else
{
/// Если нет - считываем достаточное количество данных в parse_buf
parse_buf.resize(max_query_size);
parse_buf.resize(istr.read(&parse_buf[0], max_query_size));
begin = &parse_buf[0];
end = begin + parse_buf.size();
}
const char * pos = begin;
bool parse_res = parser.parse(pos, end, ast, expected);
/// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой.
if (!parse_res || (pos != end && *pos != ';'))
throw DB::Exception("Syntax error: failed at position "
+ Poco::NumberFormatter::format(pos - begin) + ": "
+ std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
DB::ErrorCodes::SYNTAX_ERROR);
InterpreterQuery interpreter(ast, context, max_block_size);
interpreter.execute(ostr, new ReadBuffer(istr), query_plan);
}
}

View File

@ -0,0 +1,75 @@
#include <Poco/DirectoryIterator.h>
#include <Poco/FileStream.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/loadMetadata.h>
namespace DB
{
static void executeCreateQuery(const String & query, Context & context, const String & database, const String & file_name)
{
const char * begin = query.data();
const char * end = begin + query.size();
const char * pos = begin;
ParserCreateQuery parser;
ASTPtr ast;
String expected;
bool parse_res = parser.parse(pos, end, ast, expected);
/// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой.
if (!parse_res || (pos != end && *pos != ';'))
throw DB::Exception("Syntax error while executing query from file " + file_name + ": failed at position "
+ Poco::NumberFormatter::format(pos - begin) + ": "
+ std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
DB::ErrorCodes::SYNTAX_ERROR);
ASTCreateQuery & ast_create_query = dynamic_cast<ASTCreateQuery &>(*ast);
ast_create_query.attach = true;
ast_create_query.database = database;
InterpreterCreateQuery interpreter;
interpreter.execute(ast, context);
}
void loadMetadata(Context & context)
{
/// Здесь хранятся определения таблиц
String path = context.path + "metadata";
/// Цикл по базам данных
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
{
if (!it->isDirectory())
continue;
/// Цикл по таблицам
for (Poco::DirectoryIterator jt(it->path()); jt != dir_end; ++jt)
{
/// Файлы имеют имена вида table_name.sql
if (jt.name().compare(jt.name().size() - 4, 4, ".sql"))
throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME);
Poco::FileInputStream istr(jt->path());
std::stringstream s;
s << istr.rdbuf();
if (!istr.good())
throw Exception("Cannot read from file " + jt->path(), ErrorCodes::CANNOT_READ_FROM_ISTREAM);
executeCreateQuery(s.str(), context, it.name(), jt->path());
}
}
}
}

View File

@ -0,0 +1,22 @@
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/ParserInsertQuery.h>
#include <DB/Parsers/ParserQuery.h>
namespace DB
{
bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected)
{
ParserSelectQuery select_p;
ParserInsertQuery insert_p;
ParserCreateQuery create_p;
return select_p.parse(pos, end, node, expected)
|| insert_p.parse(pos, end, node, expected)
|| create_p.parse(pos, end, node, expected);
}
}