From 7d9f303599578a62b41fe984027f8b25ed868c9f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Mar 2012 00:09:41 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- .../DB/DataStreams/UnionBlockInputStream.h | 6 +++ dbms/include/DB/Interpreters/Context.h | 2 + .../DB/Interpreters/InterpreterCreateQuery.h | 4 +- .../DB/Interpreters/InterpreterInsertQuery.h | 4 +- .../DB/Interpreters/InterpreterQuery.h | 4 +- .../DB/Interpreters/InterpreterSelectQuery.h | 4 +- dbms/include/DB/Interpreters/Settings.h | 22 +++++++++ dbms/include/DB/Interpreters/executeQuery.h | 5 +- .../Interpreters/InterpreterCreateQuery.cpp | 6 +-- .../Interpreters/InterpreterInsertQuery.cpp | 8 +-- dbms/src/Interpreters/InterpreterQuery.cpp | 10 ++-- .../Interpreters/InterpreterSelectQuery.cpp | 49 +++++++++++-------- dbms/src/Interpreters/executeQuery.cpp | 13 ++--- 13 files changed, 81 insertions(+), 56 deletions(-) create mode 100644 dbms/include/DB/Interpreters/Settings.h diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index e5aa2975ee9..26449f13a4a 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -121,6 +121,11 @@ public: BlockInputStreamPtr clone() { return new UnionBlockInputStream(children, max_threads); } + ~UnionBlockInputStream() + { + pool.wait(); + } + private: unsigned max_threads; @@ -175,6 +180,7 @@ private: } catch (const Poco::Exception & e) { + std::cerr << e.message() << std::endl; data.exception = new Exception(e.message(), ErrorCodes::POCO_EXCEPTION); } catch (const std::exception & e) diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 610689e58fe..c67f7ba77f1 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB @@ -41,6 +42,7 @@ struct Context DataTypeFactoryPtr data_type_factory; /// Типы данных. StorageFactoryPtr storage_factory; /// Движки таблиц. NamesAndTypesList columns; /// Столбцы текущей обрабатываемой таблицы. + Settings settings; /// Настройки выполнения запроса. SharedPtr mutex; /// Для доступа и модификации разделяемых объектов. diff --git a/dbms/include/DB/Interpreters/InterpreterCreateQuery.h b/dbms/include/DB/Interpreters/InterpreterCreateQuery.h index 8881ceedcaa..a8769ff8fa8 100644 --- a/dbms/include/DB/Interpreters/InterpreterCreateQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterCreateQuery.h @@ -13,7 +13,7 @@ namespace DB class InterpreterCreateQuery { public: - InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_); /** В случае таблицы: добавляет созданную таблицу в контекст, а также возвращает её. * В случае БД: добавляет созданную БД в контекст и возвращает NULL. @@ -23,8 +23,6 @@ public: private: ASTPtr query_ptr; Context context; - size_t max_threads; - size_t max_block_size; }; diff --git a/dbms/include/DB/Interpreters/InterpreterInsertQuery.h b/dbms/include/DB/Interpreters/InterpreterInsertQuery.h index bc8ffc7b51b..57469038f92 100644 --- a/dbms/include/DB/Interpreters/InterpreterInsertQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterInsertQuery.h @@ -12,7 +12,7 @@ namespace DB class InterpreterInsertQuery { public: - InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_); /** Выполнить запрос. * remaining_data_istr, если не NULL, может содержать нераспарсенные данные для вставки. @@ -25,8 +25,6 @@ private: ASTPtr query_ptr; Context context; - size_t max_threads; - size_t max_block_size; }; diff --git a/dbms/include/DB/Interpreters/InterpreterQuery.h b/dbms/include/DB/Interpreters/InterpreterQuery.h index a541d366b4b..e7d670c264e 100644 --- a/dbms/include/DB/Interpreters/InterpreterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterQuery.h @@ -12,7 +12,7 @@ namespace DB class InterpreterQuery { public: - InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + InterpreterQuery(ASTPtr query_ptr_, Context & context_); /** Выполнить запрос. * @@ -31,8 +31,6 @@ public: private: ASTPtr query_ptr; Context context; - size_t max_threads; - size_t max_block_size; }; diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index e2caf4f614c..59024695b8f 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -13,7 +13,7 @@ namespace DB class InterpreterSelectQuery { public: - InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_); /// Выполнить запрос, получить поток блоков для чтения BlockInputStreamPtr execute(); @@ -51,8 +51,6 @@ private: ASTPtr query_ptr; Context context; - size_t max_threads; - size_t max_block_size; }; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h new file mode 100644 index 00000000000..308d6143703 --- /dev/null +++ b/dbms/include/DB/Interpreters/Settings.h @@ -0,0 +1,22 @@ +#pragma once + +#include + + +namespace DB +{ + +/** Настройки выполнения запроса. + */ +struct Settings +{ + size_t max_block_size; /// Максимальный размер блока для чтения + size_t max_threads; /// Максимальное количество потоков выполнения запроса + size_t max_query_size; /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже) + bool asynchronous; /// Выполнять разные стадии конвейера выполнения запроса параллельно + + Settings() : max_block_size(DEFAULT_BLOCK_SIZE), max_threads(DEFAULT_MAX_THREADS), max_query_size(DEFAULT_MAX_QUERY_SIZE), asynchronous(true) {} +}; + + +} diff --git a/dbms/include/DB/Interpreters/executeQuery.h b/dbms/include/DB/Interpreters/executeQuery.h index a509d759770..6d2aea59384 100644 --- a/dbms/include/DB/Interpreters/executeQuery.h +++ b/dbms/include/DB/Interpreters/executeQuery.h @@ -14,9 +14,6 @@ void executeQuery( ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть) WriteBuffer & ostr, /// Куда писать результат Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции... - BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос - size_t max_query_size = DEFAULT_MAX_QUERY_SIZE, /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже) - size_t max_threads = DEFAULT_MAX_THREADS, /// Максимальное количество потоков выполнения запроса - size_t max_block_size = DEFAULT_BLOCK_SIZE); /// Максимальный размер блока при чтении или вставке данных + BlockInputStreamPtr & query_plan); /// Сюда может быть записано описание, как выполнялся запрос } diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 008443dfadf..90c71c0d052 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -23,8 +23,8 @@ namespace DB { -InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_) - : query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_) +InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) { } @@ -94,7 +94,7 @@ StoragePtr InterpreterCreateQuery::execute() SharedPtr interpreter_select; if (create.select) - interpreter_select = new InterpreterSelectQuery(create.select, context, max_threads, max_block_size); + interpreter_select = new InterpreterSelectQuery(create.select, context); /// Получаем список столбцов if (create.columns) diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index d5be7110495..641b04d05ac 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -16,8 +16,8 @@ namespace DB { -InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_) - : query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_) +InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) { } @@ -73,12 +73,12 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr) ConcatReadBuffer istr(buffers); Block sample = table->getSampleBlock(); - in = format_factory.getInput(format, istr, sample, max_block_size, *context.data_type_factory); + in = format_factory.getInput(format, istr, sample, context.settings.max_block_size, *context.data_type_factory); copyData(*in, *out); } else { - InterpreterSelectQuery interpreter_select(query.select, context, max_threads, max_block_size); + InterpreterSelectQuery interpreter_select(query.select, context); in = interpreter_select.execute(); in = new MaterializingBlockInputStream(in); copyData(*in, *out); diff --git a/dbms/src/Interpreters/InterpreterQuery.cpp b/dbms/src/Interpreters/InterpreterQuery.cpp index 0c1284babe3..b63a74b4bab 100644 --- a/dbms/src/Interpreters/InterpreterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterQuery.cpp @@ -14,8 +14,8 @@ namespace DB { -InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_) - : query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_) +InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) { } @@ -24,17 +24,17 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i { if (dynamic_cast(&*query_ptr)) { - InterpreterSelectQuery interpreter(query_ptr, context, max_threads, max_block_size); + InterpreterSelectQuery interpreter(query_ptr, context); query_plan = interpreter.executeAndFormat(ostr); } else if (dynamic_cast(&*query_ptr)) { - InterpreterInsertQuery interpreter(query_ptr, context, max_threads, max_block_size); + InterpreterInsertQuery interpreter(query_ptr, context); interpreter.execute(remaining_data_istr); } else if (dynamic_cast(&*query_ptr)) { - InterpreterCreateQuery interpreter(query_ptr, context, max_threads, max_block_size); + InterpreterCreateQuery interpreter(query_ptr, context); interpreter.execute(); } else if (dynamic_cast(&*query_ptr)) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index b323ae8f268..3df05726a25 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -26,8 +26,8 @@ namespace DB { -InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_) - : query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_) +InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) { } @@ -71,7 +71,7 @@ void InterpreterSelectQuery::setColumns() context.columns = !query.table || !dynamic_cast(&*query.table) ? getTable()->getColumnsList() - : InterpreterSelectQuery(query.table, context, max_threads, max_block_size).getSampleBlock().getColumnsList(); + : InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList(); if (context.columns.empty()) throw Exception("There is no available columns", ErrorCodes::THERE_IS_NO_COLUMN); @@ -94,6 +94,15 @@ Block InterpreterSelectQuery::getSampleBlock() } +/// Превращает источник в асинхронный, если это указано. +static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool is_async) +{ + return is_async + ? new AsynchronousBlockInputStream(in) + : in; +} + + BlockInputStreamPtr InterpreterSelectQuery::execute() { ASTSelectQuery & query = dynamic_cast(*query_ptr); @@ -109,7 +118,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() if (!query.table || !dynamic_cast(&*query.table)) table = getTable(); else - interpreter_subquery = new InterpreterSelectQuery(query.table, context, max_threads, max_block_size); + interpreter_subquery = new InterpreterSelectQuery(query.table, context); /// Объект, с помощью которого анализируется запрос. Poco::SharedPtr expression = new Expression(query_ptr, context); @@ -135,7 +144,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size, * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено). */ - size_t block_size = max_block_size; + size_t block_size = context.settings.max_block_size; if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list && query.limit_length && !need_aggregate && limit_length + limit_offset < block_size) { @@ -155,9 +164,9 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? if (!query.table || !dynamic_cast(&*query.table)) - streams = table->read(required_columns, query_ptr, block_size, max_threads); + streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads); else - streams.push_back(new AsynchronousBlockInputStream(interpreter_subquery->execute())); + streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), context.settings.asynchronous)); if (streams.empty()) throw Exception("No streams returned from table.", ErrorCodes::NO_STREAMS_RETURNED_FROM_TABLE); @@ -170,8 +179,8 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; - stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_WHERE)); - stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream)); + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE), context.settings.asynchronous); + stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous); } } @@ -186,7 +195,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; - stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING)); + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), context.settings.asynchronous); } BlockInputStreamPtr & stream = streams[0]; @@ -194,14 +203,14 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /// Если потоков несколько, то выполняем параллельную агрегацию if (streams.size() > 1) { - stream = new AsynchronousBlockInputStream(new ParallelAggregatingBlockInputStream(streams, expression, max_threads)); + stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, expression, context.settings.max_threads), context.settings.asynchronous); streams.resize(1); } else - stream = new AsynchronousBlockInputStream(new AggregatingBlockInputStream(stream, expression)); + stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, expression), context.settings.asynchronous); /// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения - stream = new AsynchronousBlockInputStream(new FinalizingAggregatedBlockInputStream(stream)); + stream = maybeAsynchronous(new FinalizingAggregatedBlockInputStream(stream), context.settings.asynchronous); } /// Если есть условие HAVING - сначала выполним часть выражения, необходимую для его вычисления @@ -212,8 +221,8 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; - stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_HAVING)); - stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream)); + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING), context.settings.asynchronous); + stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous); } } @@ -225,7 +234,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; - stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER)); + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER), context.settings.asynchronous); /** Оставим только столбцы, нужные для SELECT и ORDER BY части. * Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части. @@ -252,7 +261,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; - stream = new AsynchronousBlockInputStream(new PartialSortingBlockInputStream(stream, order_descr)); + stream = maybeAsynchronous(new PartialSortingBlockInputStream(stream, order_descr), context.settings.asynchronous); } BlockInputStreamPtr & stream = streams[0]; @@ -260,12 +269,12 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /// Если потоков несколько, то объединяем их в один if (streams.size() > 1) { - stream = new UnionBlockInputStream(streams, max_threads); + stream = new UnionBlockInputStream(streams, context.settings.max_threads); streams.resize(1); } /// Сливаем сортированные блоки - stream = new AsynchronousBlockInputStream(new MergeSortingBlockInputStream(stream, order_descr)); + stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr), context.settings.asynchronous); /// Оставим только столбцы, нужные для SELECT части stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list); @@ -274,7 +283,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /// Если до сих пор есть несколько потоков, то объединяем их в один if (streams.size() > 1) { - streams[0] = new UnionBlockInputStream(streams, max_threads); + streams[0] = new UnionBlockInputStream(streams, context.settings.max_threads); streams.resize(1); } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 3dc2015f3cc..bb8628b7fbf 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -11,10 +11,7 @@ void executeQuery( ReadBuffer & istr, WriteBuffer & ostr, Context & context, - BlockInputStreamPtr & query_plan, - size_t max_query_size, - size_t max_threads, - size_t max_block_size) + BlockInputStreamPtr & query_plan) { DB::ParserQuery parser; DB::ASTPtr ast; @@ -28,7 +25,7 @@ void executeQuery( if (istr.buffer().size() == 0) istr.next(); - if (istr.buffer().end() - istr.position() >= static_cast(max_query_size)) + if (istr.buffer().end() - istr.position() >= static_cast(context.settings.max_query_size)) { /// Если оставшийся размер буфера istr достаточен, чтобы распарсить запрос до max_query_size, то парсим прямо в нём begin = istr.position(); @@ -38,8 +35,8 @@ void executeQuery( else { /// Если нет - считываем достаточное количество данных в parse_buf - parse_buf.resize(max_query_size); - parse_buf.resize(istr.read(&parse_buf[0], max_query_size)); + parse_buf.resize(context.settings.max_query_size); + parse_buf.resize(istr.read(&parse_buf[0], context.settings.max_query_size)); begin = &parse_buf[0]; end = begin + parse_buf.size(); } @@ -59,7 +56,7 @@ void executeQuery( formatAST(*ast, std::cerr); std::cerr << std::endl; - InterpreterQuery interpreter(ast, context, max_threads, max_block_size); + InterpreterQuery interpreter(ast, context); interpreter.execute(ostr, &istr, query_plan); }