dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-03-05 00:09:41 +00:00
parent c90243f7fe
commit 7d9f303599
13 changed files with 81 additions and 56 deletions

View File

@ -121,6 +121,11 @@ public:
BlockInputStreamPtr clone() { return new UnionBlockInputStream(children, max_threads); }
~UnionBlockInputStream()
{
pool.wait();
}
private:
unsigned max_threads;
@ -175,6 +180,7 @@ private:
}
catch (const Poco::Exception & e)
{
std::cerr << e.message() << std::endl;
data.exception = new Exception(e.message(), ErrorCodes::POCO_EXCEPTION);
}
catch (const std::exception & e)

View File

@ -12,6 +12,7 @@
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Interpreters/Settings.h>
namespace DB
@ -41,6 +42,7 @@ struct Context
DataTypeFactoryPtr data_type_factory; /// Типы данных.
StorageFactoryPtr storage_factory; /// Движки таблиц.
NamesAndTypesList columns; /// Столбцы текущей обрабатываемой таблицы.
Settings settings; /// Настройки выполнения запроса.
SharedPtr<Poco::FastMutex> mutex; /// Для доступа и модификации разделяемых объектов.

View File

@ -13,7 +13,7 @@ namespace DB
class InterpreterCreateQuery
{
public:
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_);
/** В случае таблицы: добавляет созданную таблицу в контекст, а также возвращает её.
* В случае БД: добавляет созданную БД в контекст и возвращает NULL.
@ -23,8 +23,6 @@ public:
private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};

View File

@ -12,7 +12,7 @@ namespace DB
class InterpreterInsertQuery
{
public:
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_);
/** Выполнить запрос.
* remaining_data_istr, если не NULL, может содержать нераспарсенные данные для вставки.
@ -25,8 +25,6 @@ private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};

View File

@ -12,7 +12,7 @@ namespace DB
class InterpreterQuery
{
public:
InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterQuery(ASTPtr query_ptr_, Context & context_);
/** Выполнить запрос.
*
@ -31,8 +31,6 @@ public:
private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};

View File

@ -13,7 +13,7 @@ namespace DB
class InterpreterSelectQuery
{
public:
InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_);
/// Выполнить запрос, получить поток блоков для чтения
BlockInputStreamPtr execute();
@ -51,8 +51,6 @@ private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};

View File

@ -0,0 +1,22 @@
#pragma once
#include <DB/Core/Defines.h>
namespace DB
{
/** Настройки выполнения запроса.
*/
struct Settings
{
size_t max_block_size; /// Максимальный размер блока для чтения
size_t max_threads; /// Максимальное количество потоков выполнения запроса
size_t max_query_size; /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже)
bool asynchronous; /// Выполнять разные стадии конвейера выполнения запроса параллельно
Settings() : max_block_size(DEFAULT_BLOCK_SIZE), max_threads(DEFAULT_MAX_THREADS), max_query_size(DEFAULT_MAX_QUERY_SIZE), asynchronous(true) {}
};
}

View File

@ -14,9 +14,6 @@ void executeQuery(
ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть)
WriteBuffer & ostr, /// Куда писать результат
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос
size_t max_query_size = DEFAULT_MAX_QUERY_SIZE, /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже)
size_t max_threads = DEFAULT_MAX_THREADS, /// Максимальное количество потоков выполнения запроса
size_t max_block_size = DEFAULT_BLOCK_SIZE); /// Максимальный размер блока при чтении или вставке данных
BlockInputStreamPtr & query_plan); /// Сюда может быть записано описание, как выполнялся запрос
}

View File

@ -23,8 +23,8 @@ namespace DB
{
InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
@ -94,7 +94,7 @@ StoragePtr InterpreterCreateQuery::execute()
SharedPtr<InterpreterSelectQuery> interpreter_select;
if (create.select)
interpreter_select = new InterpreterSelectQuery(create.select, context, max_threads, max_block_size);
interpreter_select = new InterpreterSelectQuery(create.select, context);
/// Получаем список столбцов
if (create.columns)

View File

@ -16,8 +16,8 @@ namespace DB
{
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
@ -73,12 +73,12 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
ConcatReadBuffer istr(buffers);
Block sample = table->getSampleBlock();
in = format_factory.getInput(format, istr, sample, max_block_size, *context.data_type_factory);
in = format_factory.getInput(format, istr, sample, context.settings.max_block_size, *context.data_type_factory);
copyData(*in, *out);
}
else
{
InterpreterSelectQuery interpreter_select(query.select, context, max_threads, max_block_size);
InterpreterSelectQuery interpreter_select(query.select, context);
in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out);

View File

@ -14,8 +14,8 @@ namespace DB
{
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
@ -24,17 +24,17 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i
{
if (dynamic_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context, max_threads, max_block_size);
InterpreterSelectQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTInsertQuery *>(&*query_ptr))
{
InterpreterInsertQuery interpreter(query_ptr, context, max_threads, max_block_size);
InterpreterInsertQuery interpreter(query_ptr, context);
interpreter.execute(remaining_data_istr);
}
else if (dynamic_cast<ASTCreateQuery *>(&*query_ptr))
{
InterpreterCreateQuery interpreter(query_ptr, context, max_threads, max_block_size);
InterpreterCreateQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (dynamic_cast<ASTDropQuery *>(&*query_ptr))

View File

@ -26,8 +26,8 @@ namespace DB
{
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
@ -71,7 +71,7 @@ void InterpreterSelectQuery::setColumns()
context.columns = !query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)
? getTable()->getColumnsList()
: InterpreterSelectQuery(query.table, context, max_threads, max_block_size).getSampleBlock().getColumnsList();
: InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList();
if (context.columns.empty())
throw Exception("There is no available columns", ErrorCodes::THERE_IS_NO_COLUMN);
@ -94,6 +94,15 @@ Block InterpreterSelectQuery::getSampleBlock()
}
/// Превращает источник в асинхронный, если это указано.
static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool is_async)
{
return is_async
? new AsynchronousBlockInputStream(in)
: in;
}
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
@ -109,7 +118,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
table = getTable();
else
interpreter_subquery = new InterpreterSelectQuery(query.table, context, max_threads, max_block_size);
interpreter_subquery = new InterpreterSelectQuery(query.table, context);
/// Объект, с помощью которого анализируется запрос.
Poco::SharedPtr<Expression> expression = new Expression(query_ptr, context);
@ -135,7 +144,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено).
*/
size_t block_size = max_block_size;
size_t block_size = context.settings.max_block_size;
if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list
&& query.limit_length && !need_aggregate && limit_length + limit_offset < block_size)
{
@ -155,9 +164,9 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
streams = table->read(required_columns, query_ptr, block_size, max_threads);
streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads);
else
streams.push_back(new AsynchronousBlockInputStream(interpreter_subquery->execute()));
streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), context.settings.asynchronous));
if (streams.empty())
throw Exception("No streams returned from table.", ErrorCodes::NO_STREAMS_RETURNED_FROM_TABLE);
@ -170,8 +179,8 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_WHERE));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE), context.settings.asynchronous);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
}
}
@ -186,7 +195,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING));
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), context.settings.asynchronous);
}
BlockInputStreamPtr & stream = streams[0];
@ -194,14 +203,14 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Если потоков несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
stream = new AsynchronousBlockInputStream(new ParallelAggregatingBlockInputStream(streams, expression, max_threads));
stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, expression, context.settings.max_threads), context.settings.asynchronous);
streams.resize(1);
}
else
stream = new AsynchronousBlockInputStream(new AggregatingBlockInputStream(stream, expression));
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, expression), context.settings.asynchronous);
/// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения
stream = new AsynchronousBlockInputStream(new FinalizingAggregatedBlockInputStream(stream));
stream = maybeAsynchronous(new FinalizingAggregatedBlockInputStream(stream), context.settings.asynchronous);
}
/// Если есть условие HAVING - сначала выполним часть выражения, необходимую для его вычисления
@ -212,8 +221,8 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_HAVING));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING), context.settings.asynchronous);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
}
}
@ -225,7 +234,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER));
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER), context.settings.asynchronous);
/** Оставим только столбцы, нужные для SELECT и ORDER BY части.
* Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части.
@ -252,7 +261,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new PartialSortingBlockInputStream(stream, order_descr));
stream = maybeAsynchronous(new PartialSortingBlockInputStream(stream, order_descr), context.settings.asynchronous);
}
BlockInputStreamPtr & stream = streams[0];
@ -260,12 +269,12 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Если потоков несколько, то объединяем их в один
if (streams.size() > 1)
{
stream = new UnionBlockInputStream(streams, max_threads);
stream = new UnionBlockInputStream(streams, context.settings.max_threads);
streams.resize(1);
}
/// Сливаем сортированные блоки
stream = new AsynchronousBlockInputStream(new MergeSortingBlockInputStream(stream, order_descr));
stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr), context.settings.asynchronous);
/// Оставим только столбцы, нужные для SELECT части
stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list);
@ -274,7 +283,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Если до сих пор есть несколько потоков, то объединяем их в один
if (streams.size() > 1)
{
streams[0] = new UnionBlockInputStream(streams, max_threads);
streams[0] = new UnionBlockInputStream(streams, context.settings.max_threads);
streams.resize(1);
}

View File

@ -11,10 +11,7 @@ void executeQuery(
ReadBuffer & istr,
WriteBuffer & ostr,
Context & context,
BlockInputStreamPtr & query_plan,
size_t max_query_size,
size_t max_threads,
size_t max_block_size)
BlockInputStreamPtr & query_plan)
{
DB::ParserQuery parser;
DB::ASTPtr ast;
@ -28,7 +25,7 @@ void executeQuery(
if (istr.buffer().size() == 0)
istr.next();
if (istr.buffer().end() - istr.position() >= static_cast<ssize_t>(max_query_size))
if (istr.buffer().end() - istr.position() >= static_cast<ssize_t>(context.settings.max_query_size))
{
/// Если оставшийся размер буфера istr достаточен, чтобы распарсить запрос до max_query_size, то парсим прямо в нём
begin = istr.position();
@ -38,8 +35,8 @@ void executeQuery(
else
{
/// Если нет - считываем достаточное количество данных в parse_buf
parse_buf.resize(max_query_size);
parse_buf.resize(istr.read(&parse_buf[0], max_query_size));
parse_buf.resize(context.settings.max_query_size);
parse_buf.resize(istr.read(&parse_buf[0], context.settings.max_query_size));
begin = &parse_buf[0];
end = begin + parse_buf.size();
}
@ -59,7 +56,7 @@ void executeQuery(
formatAST(*ast, std::cerr);
std::cerr << std::endl;
InterpreterQuery interpreter(ast, context, max_threads, max_block_size);
InterpreterQuery interpreter(ast, context);
interpreter.execute(ostr, &istr, query_plan);
}