From 282e6f3a5bebfe77e8a546acc6c069d9545cf747 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 19 Dec 2014 15:48:09 +0300 Subject: [PATCH] dbms: Server: rewrite expression lists in each SELECT query of the UNION ALL chain, if needed. [#METR-14099] --- .../DB/Interpreters/InterpreterSelectQuery.h | 204 +-- .../Interpreters/InterpreterSelectQuery.cpp | 1339 +++++++++-------- .../0_stateless/00098_7_union_all.reference | 2 + .../queries/0_stateless/00098_7_union_all.sql | 1 + .../0_stateless/00098_8_union_all.reference | 2 + .../queries/0_stateless/00098_8_union_all.sql | 1 + 6 files changed, 786 insertions(+), 763 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00098_7_union_all.reference create mode 100644 dbms/tests/queries/0_stateless/00098_7_union_all.sql create mode 100644 dbms/tests/queries/0_stateless/00098_8_union_all.reference create mode 100644 dbms/tests/queries/0_stateless/00098_8_union_all.sql diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index c00347b10eb..57b2d56184e 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -15,124 +15,130 @@ namespace DB class InterpreterSelectQuery { public: - /** to_stage - * - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца. - * Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса. - * - * subquery_depth - * - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу. - * - * input - * - если задан - читать не из таблицы, указанной в запросе, а из готового источника. - * - * required_column_names - * - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов. - * - * table_column_names - * - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы. - * Используется, например, совместно с указанием input. - */ + /** to_stage + * - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца. + * Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса. + * + * subquery_depth + * - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу. + * + * input + * - если задан - читать не из таблицы, указанной в запросе, а из готового источника. + * + * required_column_names + * - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов. + * + * table_column_names + * - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы. + * Используется, например, совместно с указанием input. + */ - InterpreterSelectQuery( - ASTPtr query_ptr_, - const Context & context_, - QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, - size_t subquery_depth_ = 0, - BlockInputStreamPtr input = nullptr, - bool is_union_all_head_ = true); + InterpreterSelectQuery( + ASTPtr query_ptr_, + const Context & context_, + QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, + size_t subquery_depth_ = 0, + BlockInputStreamPtr input = nullptr, + bool is_union_all_head_ = true); - InterpreterSelectQuery( - ASTPtr query_ptr_, - const Context & context_, - const Names & required_column_names, - QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, - size_t subquery_depth_ = 0, - BlockInputStreamPtr input = nullptr); + InterpreterSelectQuery( + ASTPtr query_ptr_, + const Context & context_, + const Names & required_column_names, + QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, + size_t subquery_depth_ = 0, + BlockInputStreamPtr input = nullptr); - InterpreterSelectQuery( - ASTPtr query_ptr_, - const Context & context_, - const Names & required_column_names, - const NamesAndTypesList & table_column_names, - QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, - size_t subquery_depth_ = 0, - BlockInputStreamPtr input = nullptr); + InterpreterSelectQuery( + ASTPtr query_ptr_, + const Context & context_, + const Names & required_column_names, + const NamesAndTypesList & table_column_names, + QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, + size_t subquery_depth_ = 0, + BlockInputStreamPtr input = nullptr); - /** Выполнить запрос, возможно являющиийся цепочкой UNION ALL. - * Получить поток блоков для чтения - */ - BlockInputStreamPtr execute(); + /** Выполнить запрос, возможно являющиийся цепочкой UNION ALL. + * Получить поток блоков для чтения + */ + BlockInputStreamPtr execute(); - /** Выполнить запрос, записать результат в нужном формате в buf. - * BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса. - */ - BlockInputStreamPtr executeAndFormat(WriteBuffer & buf); + /** Выполнить запрос, записать результат в нужном формате в buf. + * BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса. + */ + BlockInputStreamPtr executeAndFormat(WriteBuffer & buf); - DataTypes getReturnTypes(); - Block getSampleBlock(); + DataTypes getReturnTypes(); + Block getSampleBlock(); private: - typedef Poco::SharedPtr ExpressionAnalyzerPtr; + typedef Poco::SharedPtr ExpressionAnalyzerPtr; - void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList()); + void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList()); - /// Выполнить один запрос SELECT из цепочки UNION ALL. - void executeSingleQuery(bool should_perform_union_hint = true); + /// Выполнить один запрос SELECT из цепочки UNION ALL. + void executeSingleQuery(bool should_perform_union_hint = true); - /// Является ли это первым запросом цепочки UNION ALL имеющей длниу >= 2. - bool isFirstSelectInsideUnionAll() const; + /** Оставить в каждом запросе цепочки UNION ALL только нужные столбцы секции SELECT. + * Однако, если используется хоть один DISTINCT в цепочке, то все столбцы считаются нужными, + * так как иначе DISTINCT работал бы по-другому. + */ + void rewriteExpressionList(const Names & required_column_names); + + /// Является ли это первым запросом цепочки UNION ALL имеющей длниу >= 2. + bool isFirstSelectInsideUnionAll() const; - /** Из какой таблицы читать. JOIN-ы не поддерживаются. - */ - void getDatabaseAndTableNames(String & database_name, String & table_name); + /** Из какой таблицы читать. JOIN-ы не поддерживаются. + */ + void getDatabaseAndTableNames(String & database_name, String & table_name); - /** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера. - */ - String getAnyColumn(); + /** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера. + */ + String getAnyColumn(); - /// Разные стадии выполнения запроса. + /// Разные стадии выполнения запроса. - /// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage. - QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); + /// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage. + QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); - void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); - void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, - bool overflow_row, bool final); - void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final); - void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression, - bool overflow_row); - void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression); - void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression); - void executeOrder( BlockInputStreams & streams); - void executePreLimit( BlockInputStreams & streams); - void executeUnion( BlockInputStreams & streams); - void executeLimit( BlockInputStreams & streams); - void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression); - void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns); - void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets); + void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); + void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, + bool overflow_row, bool final); + void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final); + void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression, + bool overflow_row); + void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression); + void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression); + void executeOrder( BlockInputStreams & streams); + void executePreLimit( BlockInputStreams & streams); + void executeUnion( BlockInputStreams & streams); + void executeLimit( BlockInputStreams & streams); + void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression); + void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns); + void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets); - ASTPtr query_ptr; - ASTSelectQuery & query; - Context context; - Settings settings; - QueryProcessingStage::Enum to_stage; - size_t subquery_depth; - ExpressionAnalyzerPtr query_analyzer; - BlockInputStreams streams; - - /** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT) - * или больше. Этот флаг установлен, если это первый запрос, возможно единственный, этой цепочки. - */ - bool is_union_all_head; + ASTPtr query_ptr; + ASTSelectQuery & query; + Context context; + Settings settings; + QueryProcessingStage::Enum to_stage; + size_t subquery_depth; + ExpressionAnalyzerPtr query_analyzer; + BlockInputStreams streams; + + /** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT) + * или больше. Этот флаг установлен, если это первый запрос, возможно единственный, этой цепочки. + */ + bool is_union_all_head; - /// Следующий запрос SELECT в цепочке UNION ALL. - std::unique_ptr next_select_in_union_all; - - /// Таблица, откуда читать данные, если не подзапрос. - StoragePtr storage; - IStorage::TableStructureReadLockPtr table_lock; + /// Следующий запрос SELECT в цепочке UNION ALL. + std::unique_ptr next_select_in_union_all; + + /// Таблица, откуда читать данные, если не подзапрос. + StoragePtr storage; + IStorage::TableStructureReadLockPtr table_lock; - Logger * log; + Logger * log; }; } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 99b218694ad..89f77a9e6ab 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -35,893 +35,904 @@ namespace DB void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndTypesList & table_column_names) { - ProfileEvents::increment(ProfileEvents::SelectQuery); + ProfileEvents::increment(ProfileEvents::SelectQuery); - if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth) - throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth), - ErrorCodes::TOO_DEEP_SUBQUERIES); + if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth) + throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth), + ErrorCodes::TOO_DEEP_SUBQUERIES); - if (query.table && typeid_cast(&*query.table)) - { - if (table_column_names.empty()) - context.setColumns(InterpreterSelectQuery(query.table, context, to_stage, subquery_depth, nullptr, false).getSampleBlock().getColumnsList()); - } - else - { - if (query.table && typeid_cast(&*query.table)) - { - /// Получить табличную функцию - TableFunctionPtr table_function_ptr = context.getTableFunctionFactory().get(typeid_cast(&*query.table)->name, context); - /// Выполнить ее и запомнить результат - storage = table_function_ptr->execute(query.table, context); - } - else - { - String database_name; - String table_name; + if (query.table && typeid_cast(&*query.table)) + { + if (table_column_names.empty()) + context.setColumns(InterpreterSelectQuery(query.table, context, to_stage, subquery_depth, nullptr, false).getSampleBlock().getColumnsList()); + } + else + { + if (query.table && typeid_cast(&*query.table)) + { + /// Получить табличную функцию + TableFunctionPtr table_function_ptr = context.getTableFunctionFactory().get(typeid_cast(&*query.table)->name, context); + /// Выполнить ее и запомнить результат + storage = table_function_ptr->execute(query.table, context); + } + else + { + String database_name; + String table_name; - getDatabaseAndTableNames(database_name, table_name); + getDatabaseAndTableNames(database_name, table_name); - storage = context.getTable(database_name, table_name); - } + storage = context.getTable(database_name, table_name); + } - table_lock = storage->lockStructure(false); - if (table_column_names.empty()) - context.setColumns(storage->getColumnsListNonMaterialized()); - } + table_lock = storage->lockStructure(false); + if (table_column_names.empty()) + context.setColumns(storage->getColumnsListNonMaterialized()); + } - if (!table_column_names.empty()) - context.setColumns(table_column_names); + if (!table_column_names.empty()) + context.setColumns(table_column_names); - if (context.getColumns().empty()) - throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN); + if (context.getColumns().empty()) + throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN); - query_analyzer = new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true); + query_analyzer = new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true); - /// Сохраняем в query context новые временные таблицы - for (auto & it : query_analyzer->getExternalTables()) - if (!context.tryGetExternalTable(it.first)) - context.addExternalTable(it.first, it.second); + /// Сохраняем в query context новые временные таблицы + for (auto & it : query_analyzer->getExternalTables()) + if (!context.tryGetExternalTable(it.first)) + context.addExternalTable(it.first, it.second); - if (input_) - streams.push_back(input_); - - if (isFirstSelectInsideUnionAll()) - { - // Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые. - // NOTE Мы можем безопасно применить static_cast вместо typeid_cast, - // потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT. - InterpreterSelectQuery * interpreter = this; - Block first = interpreter->getSampleBlock(); - for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast(*tree)).next_union_all) - { - interpreter->next_select_in_union_all.reset(new InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false)); - interpreter = interpreter->next_select_in_union_all.get(); - Block current = interpreter->getSampleBlock(); - if (!blocksHaveEqualStructure(first, current)) - throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain", - ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); - } - } + if (input_) + streams.push_back(input_); + + if (isFirstSelectInsideUnionAll()) + { + /// Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые. + /// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, + /// потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT. + InterpreterSelectQuery * interpreter = this; + Block first = interpreter->getSampleBlock(); + for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast(*tree)).next_union_all) + { + interpreter->next_select_in_union_all.reset(new InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false)); + interpreter = interpreter->next_select_in_union_all.get(); + Block current = interpreter->getSampleBlock(); + if (!blocksHaveEqualStructure(first, current)) + throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain", + ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); + } + } } InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_, - size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), - is_union_all_head(is_union_all_head_), - log(&Logger::get("InterpreterSelectQuery")) + size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_) + : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), + context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(is_union_all_head_), + log(&Logger::get("InterpreterSelectQuery")) { - init(input_); + init(input_); } InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, - const Names & required_column_names_, - QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), - is_union_all_head(true), - log(&Logger::get("InterpreterSelectQuery")) + const Names & required_column_names_, + QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) + : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), + context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(true), + log(&Logger::get("InterpreterSelectQuery")) { - /** Оставляем в запросе в секции SELECT только нужные столбцы. - * Но если используется DISTINCT, то все столбцы считаются нужными, так как иначе DISTINCT работал бы по-другому. - */ - if (!query.distinct) - query.rewriteSelectExpressionList(required_column_names_); - - init(input_); + rewriteExpressionList(required_column_names_); + init(input_); } InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, - const Names & required_column_names_, - const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), - is_union_all_head(true), - log(&Logger::get("InterpreterSelectQuery")) + const Names & required_column_names_, + const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) + : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), + context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(true), + log(&Logger::get("InterpreterSelectQuery")) { - /** Оставляем в запросе в секции SELECT только нужные столбцы. - * Но если используется DISTINCT, то все столбцы считаются нужными, так как иначе DISTINCT работал бы по-другому. - */ - if (!query.distinct) - query.rewriteSelectExpressionList(required_column_names_); + rewriteExpressionList(required_column_names_); + init(input_, table_column_names); +} - init(input_, table_column_names); +void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column_names) +{ + if (query.distinct) + return; + + for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast(tree)->next_union_all.get()) + { + auto & next_query = *(static_cast(tree)); + if (next_query.distinct) + return; + } + + query.rewriteSelectExpressionList(required_column_names); + + for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast(tree)->next_union_all.get()) + { + auto & next_query = *(static_cast(tree)); + next_query.rewriteSelectExpressionList(required_column_names); + } } bool InterpreterSelectQuery::isFirstSelectInsideUnionAll() const { - return is_union_all_head && !query.next_union_all.isNull(); + return is_union_all_head && !query.next_union_all.isNull(); } void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name) { - /** Если таблица не указана - используем таблицу system.one. - * Если база данных не указана - используем текущую базу данных. - */ - if (query.database) - database_name = typeid_cast(*query.database).name; - if (query.table) - table_name = typeid_cast(*query.table).name; + /** Если таблица не указана - используем таблицу system.one. + * Если база данных не указана - используем текущую базу данных. + */ + if (query.database) + database_name = typeid_cast(*query.database).name; + if (query.table) + table_name = typeid_cast(*query.table).name; - if (!query.table) - { - database_name = "system"; - table_name = "one"; - } - else if (!query.database) - { - if (context.tryGetTable("", table_name)) - database_name = ""; - else - database_name = context.getCurrentDatabase(); - } + if (!query.table) + { + database_name = "system"; + table_name = "one"; + } + else if (!query.database) + { + if (context.tryGetTable("", table_name)) + database_name = ""; + else + database_name = context.getCurrentDatabase(); + } } DataTypes InterpreterSelectQuery::getReturnTypes() { - DataTypes res; - NamesAndTypesList columns = query_analyzer->getSelectSampleBlock().getColumnsList(); - for (NamesAndTypesList::iterator it = columns.begin(); it != columns.end(); ++it) - { - res.push_back(it->type); - } - return res; + DataTypes res; + NamesAndTypesList columns = query_analyzer->getSelectSampleBlock().getColumnsList(); + for (NamesAndTypesList::iterator it = columns.begin(); it != columns.end(); ++it) + { + res.push_back(it->type); + } + return res; } Block InterpreterSelectQuery::getSampleBlock() { - Block block = query_analyzer->getSelectSampleBlock(); - /// создадим ненулевые колонки, чтобы SampleBlock можно было - /// писать (читать) с помощью BlockOut(In)putStream'ов - for (size_t i = 0; i < block.columns(); ++i) - { - ColumnWithNameAndType & col = block.getByPosition(i); - col.column = col.type->createColumn(); - } - return block; + Block block = query_analyzer->getSelectSampleBlock(); + /// создадим ненулевые колонки, чтобы SampleBlock можно было + /// писать (читать) с помощью BlockOut(In)putStream'ов + for (size_t i = 0; i < block.columns(); ++i) + { + ColumnWithNameAndType & col = block.getByPosition(i); + col.column = col.type->createColumn(); + } + return block; } /// Превращает источник в асинхронный, если это указано. static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool is_async) { - return is_async - ? new AsynchronousBlockInputStream(in) - : in; + return is_async + ? new AsynchronousBlockInputStream(in) + : in; } BlockInputStreamPtr InterpreterSelectQuery::execute() { - if (isFirstSelectInsideUnionAll()) - { - executeSingleQuery(false); - for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get()) - { - p->executeSingleQuery(false); - const auto & others = p->streams; - streams.insert(streams.end(), others.begin(), others.end()); - } - - if (streams.empty()) - return new NullBlockInputStream; - - for (auto & stream : streams) - stream = new MaterializingBlockInputStream(stream); - - executeUnion(streams); - } - else - { - executeSingleQuery(); - if (streams.empty()) - return new NullBlockInputStream; - } - - /// Ограничения на результат, квота на результат, а также колбек для прогресса. - if (IProfilingBlockInputStream * stream = dynamic_cast(&*streams[0])) - { - stream->setProgressCallback(context.getProgressCallback()); - stream->setProcessListElement(context.getProcessListElement()); + if (isFirstSelectInsideUnionAll()) + { + executeSingleQuery(false); + for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get()) + { + p->executeSingleQuery(false); + const auto & others = p->streams; + streams.insert(streams.end(), others.begin(), others.end()); + } + + if (streams.empty()) + return new NullBlockInputStream; + + for (auto & stream : streams) + stream = new MaterializingBlockInputStream(stream); + + executeUnion(streams); + } + else + { + executeSingleQuery(); + if (streams.empty()) + return new NullBlockInputStream; + } + + /// Ограничения на результат, квота на результат, а также колбек для прогресса. + if (IProfilingBlockInputStream * stream = dynamic_cast(&*streams[0])) + { + stream->setProgressCallback(context.getProgressCallback()); + stream->setProcessListElement(context.getProcessListElement()); - /// Ограничения действуют только на конечный результат. - if (to_stage == QueryProcessingStage::Complete) - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; - limits.max_rows_to_read = settings.limits.max_result_rows; - limits.max_bytes_to_read = settings.limits.max_result_bytes; - limits.read_overflow_mode = settings.limits.result_overflow_mode; + /// Ограничения действуют только на конечный результат. + if (to_stage == QueryProcessingStage::Complete) + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; + limits.max_rows_to_read = settings.limits.max_result_rows; + limits.max_bytes_to_read = settings.limits.max_result_bytes; + limits.read_overflow_mode = settings.limits.result_overflow_mode; - stream->setLimits(limits); - stream->setQuota(context.getQuota()); - } - } - - return streams[0]; + stream->setLimits(limits); + stream->setQuota(context.getQuota()); + } + } + + return streams[0]; } void InterpreterSelectQuery::executeSingleQuery(bool should_perform_union_hint) { - /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. - * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем - * если есть ORDER BY, то склеим потоки с помощью UnionBlockInputStream, а затем MergеSortingBlockInputStream, - * если нет, то склеим с помощью UnionBlockInputStream, - * затем применим LIMIT. - * Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно; - * параллельный GROUP BY склеит потоки в один, - * затем выполним остальные операции с одним получившимся потоком. - * Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT, - * то объединение источников данных выполняется не на этом уровне, а на верхнем уровне. - */ + /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. + * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем + * если есть ORDER BY, то склеим потоки с помощью UnionBlockInputStream, а затем MergеSortingBlockInputStream, + * если нет, то склеим с помощью UnionBlockInputStream, + * затем применим LIMIT. + * Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно; + * параллельный GROUP BY склеит потоки в один, + * затем выполним остальные операции с одним получившимся потоком. + * Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT, + * то объединение источников данных выполняется не на этом уровне, а на верхнем уровне. + */ - bool do_execute_union = should_perform_union_hint; - - /** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */ - QueryProcessingStage::Enum from_stage = executeFetchColumns(streams); + bool do_execute_union = should_perform_union_hint; + + /** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */ + QueryProcessingStage::Enum from_stage = executeFetchColumns(streams); - LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); + LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); - if (to_stage > QueryProcessingStage::FetchColumns) - { - bool has_where = false; - bool need_aggregate = false; - bool has_having = false; - bool has_order_by = false; - - ExpressionActionsPtr array_join; - ExpressionActionsPtr before_where; - ExpressionActionsPtr before_aggregation; - ExpressionActionsPtr before_having; - ExpressionActionsPtr before_order_and_select; - ExpressionActionsPtr final_projection; + if (to_stage > QueryProcessingStage::FetchColumns) + { + bool has_where = false; + bool need_aggregate = false; + bool has_having = false; + bool has_order_by = false; + + ExpressionActionsPtr array_join; + ExpressionActionsPtr before_where; + ExpressionActionsPtr before_aggregation; + ExpressionActionsPtr before_having; + ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr final_projection; - /// Столбцы из списка SELECT, до переименования в алиасы. - Names selected_columns; + /// Столбцы из списка SELECT, до переименования в алиасы. + Names selected_columns; - /// Нужно ли выполнять первую часть конвейера - выполняемую на удаленных серверах при распределенной обработке. - bool first_stage = from_stage < QueryProcessingStage::WithMergeableState - && to_stage >= QueryProcessingStage::WithMergeableState; - /// Нужно ли выполнять вторую часть конвейера - выполняемую на сервере-инициаторе при распределенной обработке. - bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState - && to_stage > QueryProcessingStage::WithMergeableState; + /// Нужно ли выполнять первую часть конвейера - выполняемую на удаленных серверах при распределенной обработке. + bool first_stage = from_stage < QueryProcessingStage::WithMergeableState + && to_stage >= QueryProcessingStage::WithMergeableState; + /// Нужно ли выполнять вторую часть конвейера - выполняемую на сервере-инициаторе при распределенной обработке. + bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState + && to_stage > QueryProcessingStage::WithMergeableState; - /** Сначала составим цепочку действий и запомним нужные шаги из нее. - * Независимо от from_stage и to_stage составим полную последовательность действий, чтобы выполнять оптимизации и - * выбрасывать ненужные столбцы с учетом всего запроса. В ненужных частях запроса не будем выполнять подзапросы. - */ + /** Сначала составим цепочку действий и запомним нужные шаги из нее. + * Независимо от from_stage и to_stage составим полную последовательность действий, чтобы выполнять оптимизации и + * выбрасывать ненужные столбцы с учетом всего запроса. В ненужных частях запроса не будем выполнять подзапросы. + */ - { - ExpressionActionsChain chain; + { + ExpressionActionsChain chain; - need_aggregate = query_analyzer->hasAggregation(); + need_aggregate = query_analyzer->hasAggregation(); - query_analyzer->appendArrayJoin(chain, !first_stage); - query_analyzer->appendJoin(chain, !first_stage); + query_analyzer->appendArrayJoin(chain, !first_stage); + query_analyzer->appendJoin(chain, !first_stage); - if (query_analyzer->appendWhere(chain, !first_stage)) - { - has_where = true; - before_where = chain.getLastActions(); - chain.addStep(); - } + if (query_analyzer->appendWhere(chain, !first_stage)) + { + has_where = true; + before_where = chain.getLastActions(); + chain.addStep(); + } - if (need_aggregate) - { - query_analyzer->appendGroupBy(chain, !first_stage); - query_analyzer->appendAggregateFunctionsArguments(chain, !first_stage); - before_aggregation = chain.getLastActions(); + if (need_aggregate) + { + query_analyzer->appendGroupBy(chain, !first_stage); + query_analyzer->appendAggregateFunctionsArguments(chain, !first_stage); + before_aggregation = chain.getLastActions(); - chain.finalize(); - chain.clear(); + chain.finalize(); + chain.clear(); - if (query_analyzer->appendHaving(chain, !second_stage)) - { - has_having = true; - before_having = chain.getLastActions(); - chain.addStep(); - } - } + if (query_analyzer->appendHaving(chain, !second_stage)) + { + has_having = true; + before_having = chain.getLastActions(); + chain.addStep(); + } + } - /// Если есть агрегация, выполняем выражения в SELECT и ORDER BY на инициировавшем сервере, иначе - на серверах-источниках. - query_analyzer->appendSelect(chain, need_aggregate ? !second_stage : !first_stage); - selected_columns = chain.getLastStep().required_output; - has_order_by = query_analyzer->appendOrderBy(chain, need_aggregate ? !second_stage : !first_stage); - before_order_and_select = chain.getLastActions(); - chain.addStep(); + /// Если есть агрегация, выполняем выражения в SELECT и ORDER BY на инициировавшем сервере, иначе - на серверах-источниках. + query_analyzer->appendSelect(chain, need_aggregate ? !second_stage : !first_stage); + selected_columns = chain.getLastStep().required_output; + has_order_by = query_analyzer->appendOrderBy(chain, need_aggregate ? !second_stage : !first_stage); + before_order_and_select = chain.getLastActions(); + chain.addStep(); - query_analyzer->appendProjectResult(chain, !second_stage); - final_projection = chain.getLastActions(); + query_analyzer->appendProjectResult(chain, !second_stage); + final_projection = chain.getLastActions(); - chain.finalize(); - chain.clear(); - } + chain.finalize(); + chain.clear(); + } - /** Если данных нет. - * Эта проверка специально вынесена чуть ниже, чем она могла бы быть (сразу после executeFetchColumns), - * чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов). - * Иначе мог бы вернуться пустой результат на некорректный запрос. - */ - if (streams.empty()) - return; + /** Если данных нет. + * Эта проверка специально вынесена чуть ниже, чем она могла бы быть (сразу после executeFetchColumns), + * чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов). + * Иначе мог бы вернуться пустой результат на некорректный запрос. + */ + if (streams.empty()) + return; - /// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации). - if (has_having) - before_having->prependProjectInput(); + /// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации). + if (has_having) + before_having->prependProjectInput(); - /// Теперь составим потоки блоков, выполняющие нужные действия. + /// Теперь составим потоки блоков, выполняющие нужные действия. - /// Нужно ли агрегировать в отдельную строку строки, не прошедшие max_rows_to_group_by. - bool aggregate_overflow_row = - need_aggregate && - query.group_by_with_totals && - settings.limits.max_rows_to_group_by && - settings.limits.group_by_overflow_mode == OverflowMode::ANY && - settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; - /// Нужно ли после агрегации сразу финализировать агрегатные функции. - bool aggregate_final = - need_aggregate && - to_stage > QueryProcessingStage::WithMergeableState && - !query.group_by_with_totals; + /// Нужно ли агрегировать в отдельную строку строки, не прошедшие max_rows_to_group_by. + bool aggregate_overflow_row = + need_aggregate && + query.group_by_with_totals && + settings.limits.max_rows_to_group_by && + settings.limits.group_by_overflow_mode == OverflowMode::ANY && + settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; + /// Нужно ли после агрегации сразу финализировать агрегатные функции. + bool aggregate_final = + need_aggregate && + to_stage > QueryProcessingStage::WithMergeableState && + !query.group_by_with_totals; - - if (need_aggregate || has_order_by) - do_execute_union = true; - - if (first_stage) - { - if (has_where) - executeWhere(streams, before_where); + + if (need_aggregate || has_order_by) + do_execute_union = true; + + if (first_stage) + { + if (has_where) + executeWhere(streams, before_where); - if (need_aggregate) - executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final); - else - { - executeExpression(streams, before_order_and_select); - executeDistinct(streams, true, selected_columns); - } + if (need_aggregate) + executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final); + else + { + executeExpression(streams, before_order_and_select); + executeDistinct(streams, true, selected_columns); + } - /** Оптимизация - при распределённой обработке запроса, - * если не указаны GROUP, HAVING, ORDER, но указан LIMIT, - * то выполним предварительный LIMIT на удалёном сервере. - */ - if (!second_stage - && !need_aggregate && !has_having && !has_order_by - && query.limit_length) - { - executePreLimit(streams); - do_execute_union = true; - } - } - - if (second_stage) - { - bool need_second_distinct_pass = true; + /** Оптимизация - при распределённой обработке запроса, + * если не указаны GROUP, HAVING, ORDER, но указан LIMIT, + * то выполним предварительный LIMIT на удалёном сервере. + */ + if (!second_stage + && !need_aggregate && !has_having && !has_order_by + && query.limit_length) + { + executePreLimit(streams); + do_execute_union = true; + } + } + + if (second_stage) + { + bool need_second_distinct_pass = true; - if (need_aggregate) - { - /// Если нужно объединить агрегированные результаты с нескольких серверов - if (!first_stage) - executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final); + if (need_aggregate) + { + /// Если нужно объединить агрегированные результаты с нескольких серверов + if (!first_stage) + executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final); - if (!aggregate_final) - executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row); - else if (has_having) - executeHaving(streams, before_having); + if (!aggregate_final) + executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row); + else if (has_having) + executeHaving(streams, before_having); - executeExpression(streams, before_order_and_select); - executeDistinct(streams, true, selected_columns); + executeExpression(streams, before_order_and_select); + executeDistinct(streams, true, selected_columns); - need_second_distinct_pass = streams.size() > 1; - } - else if (query.group_by_with_totals && !aggregate_final) - { - executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row); - } + need_second_distinct_pass = streams.size() > 1; + } + else if (query.group_by_with_totals && !aggregate_final) + { + executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row); + } - if (has_order_by) - executeOrder(streams); + if (has_order_by) + executeOrder(streams); - executeProjection(streams, final_projection); + executeProjection(streams, final_projection); - /// На этой стадии можно считать минимумы и максимумы, если надо. - if (settings.extremes) - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) - stream->enableExtremes(); + /// На этой стадии можно считать минимумы и максимумы, если надо. + if (settings.extremes) + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) + stream->enableExtremes(); - /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, - * ограничивающий число записей в каждом до offset + limit. - */ - if (query.limit_length && streams.size() > 1 && !query.distinct) - { - executePreLimit(streams); - do_execute_union = true; - } - - if (need_second_distinct_pass) - do_execute_union = true; - - if (do_execute_union) - executeUnion(streams); - - /// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния. - if (need_second_distinct_pass) - executeDistinct(streams, false, Names()); + /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, + * ограничивающий число записей в каждом до offset + limit. + */ + if (query.limit_length && streams.size() > 1 && !query.distinct) + { + executePreLimit(streams); + do_execute_union = true; + } + + if (need_second_distinct_pass) + do_execute_union = true; + + if (do_execute_union) + executeUnion(streams); + + /// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния. + if (need_second_distinct_pass) + executeDistinct(streams, false, Names()); - executeLimit(streams); - } - } + executeLimit(streams); + } + } - /** Если данных нет. */ - if (streams.empty()) - return; + /** Если данных нет. */ + if (streams.empty()) + return; - if (do_execute_union) - executeUnion(streams); + if (do_execute_union) + executeUnion(streams); - SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); - if (!subqueries_for_sets.empty()) - executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets); + SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); + if (!subqueries_for_sets.empty()) + executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets); } static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, size_t & offset) { - length = 0; - offset = 0; - if (query.limit_length) - { - length = safeGet(typeid_cast(*query.limit_length).value); - if (query.limit_offset) - offset = safeGet(typeid_cast(*query.limit_offset).value); - } + length = 0; + offset = 0; + if (query.limit_length) + { + length = safeGet(typeid_cast(*query.limit_length).value); + if (query.limit_offset) + offset = safeGet(typeid_cast(*query.limit_offset).value); + } } QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInputStreams & streams) { - if (!streams.empty()) - return QueryProcessingStage::FetchColumns; + if (!streams.empty()) + return QueryProcessingStage::FetchColumns; - /// Интерпретатор подзапроса, если подзапрос - SharedPtr interpreter_subquery; + /// Интерпретатор подзапроса, если подзапрос + SharedPtr interpreter_subquery; - /// Список столбцов, которых нужно прочитать, чтобы выполнить запрос. - Names required_columns = query_analyzer->getRequiredColumns(); + /// Список столбцов, которых нужно прочитать, чтобы выполнить запрос. + Names required_columns = query_analyzer->getRequiredColumns(); - if (query.table && typeid_cast(&*query.table)) - { - /** Для подзапроса не действуют ограничения на максимальный размер результата. - * Так как результат поздапроса - ещё не результат всего запроса. - */ - Context subquery_context = context; - Settings subquery_settings = context.getSettings(); - subquery_settings.limits.max_result_rows = 0; - subquery_settings.limits.max_result_bytes = 0; - /// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса). - subquery_settings.extremes = 0; - subquery_context.setSettings(subquery_settings); + if (query.table && typeid_cast(&*query.table)) + { + /** Для подзапроса не действуют ограничения на максимальный размер результата. + * Так как результат поздапроса - ещё не результат всего запроса. + */ + Context subquery_context = context; + Settings subquery_settings = context.getSettings(); + subquery_settings.limits.max_result_rows = 0; + subquery_settings.limits.max_result_bytes = 0; + /// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса). + subquery_settings.extremes = 0; + subquery_context.setSettings(subquery_settings); - interpreter_subquery = new InterpreterSelectQuery( - query.table, subquery_context, required_columns, QueryProcessingStage::Complete, subquery_depth + 1); - } + interpreter_subquery = new InterpreterSelectQuery( + query.table, subquery_context, required_columns, QueryProcessingStage::Complete, subquery_depth + 1); + } - /// если в настройках установлен default_sample != 1, то все запросы выполняем с сэмплингом - /// если таблица не поддерживает сэмплинг получим исключение - /// поэтому запросы типа SHOW TABLES работать с включенном default_sample не будут - if (!query.sample_size && settings.default_sample != 1) - query.sample_size = new ASTLiteral(StringRange(), Float64(settings.default_sample)); + /// если в настройках установлен default_sample != 1, то все запросы выполняем с сэмплингом + /// если таблица не поддерживает сэмплинг получим исключение + /// поэтому запросы типа SHOW TABLES работать с включенном default_sample не будут + if (!query.sample_size && settings.default_sample != 1) + query.sample_size = new ASTLiteral(StringRange(), Float64(settings.default_sample)); - if (query.sample_size && (!storage || !storage->supportsSampling())) - throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); + if (query.sample_size && (!storage || !storage->supportsSampling())) + throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); - if (query.final && (!storage || !storage->supportsFinal())) - throw Exception(storage ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL); + if (query.final && (!storage || !storage->supportsFinal())) + throw Exception(storage ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL); - if (query.prewhere_expression && (!storage || !storage->supportsPrewhere())) - throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE); + if (query.prewhere_expression && (!storage || !storage->supportsPrewhere())) + throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE); - /** При распределённой обработке запроса, в потоках почти не делается вычислений, - * а делается ожидание и получение данных с удалённых серверов. - * Если у нас 20 удалённых серверов, а max_threads = 8, то было бы не очень хорошо - * соединяться и опрашивать только по 8 серверов одновременно. - * Чтобы одновременно опрашивалось больше удалённых серверов, - * вместо max_threads используется max_distributed_connections. - * - * Сохраним изначальное значение max_threads в settings_for_storage - * - эти настройки будут переданы на удалённые серверы при распределённой обработке запроса, - * и там должно быть оригинальное значение max_threads, а не увеличенное. - */ - Settings settings_for_storage = settings; - if (storage && storage->isRemote()) - settings.max_threads = settings.max_distributed_connections; + /** При распределённой обработке запроса, в потоках почти не делается вычислений, + * а делается ожидание и получение данных с удалённых серверов. + * Если у нас 20 удалённых серверов, а max_threads = 8, то было бы не очень хорошо + * соединяться и опрашивать только по 8 серверов одновременно. + * Чтобы одновременно опрашивалось больше удалённых серверов, + * вместо max_threads используется max_distributed_connections. + * + * Сохраним изначальное значение max_threads в settings_for_storage + * - эти настройки будут переданы на удалённые серверы при распределённой обработке запроса, + * и там должно быть оригинальное значение max_threads, а не увеличенное. + */ + Settings settings_for_storage = settings; + if (storage && storage->isRemote()) + settings.max_threads = settings.max_distributed_connections; - /// Ограничение на количество столбцов для чтения. - if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read) - throw Exception("Limit for number of columns to read exceeded. " - "Requested: " + toString(required_columns.size()) - + ", maximum: " + toString(settings.limits.max_columns_to_read), - ErrorCodes::TOO_MUCH_COLUMNS); + /// Ограничение на количество столбцов для чтения. + if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read) + throw Exception("Limit for number of columns to read exceeded. " + "Requested: " + toString(required_columns.size()) + + ", maximum: " + toString(settings.limits.max_columns_to_read), + ErrorCodes::TOO_MUCH_COLUMNS); - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); - /** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size, - * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено), - * а также установим количество потоков в 1 и отменим асинхронное выполнение конвейера запроса. - */ - if (!query.distinct && !query.prewhere_expression && !query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list - && query.limit_length && !query_analyzer->hasAggregation() && limit_length + limit_offset < settings.max_block_size) - { - settings.max_block_size = limit_length + limit_offset; - settings.max_threads = 1; - settings.asynchronous = false; - } + /** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size, + * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено), + * а также установим количество потоков в 1 и отменим асинхронное выполнение конвейера запроса. + */ + if (!query.distinct && !query.prewhere_expression && !query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list + && query.limit_length && !query_analyzer->hasAggregation() && limit_length + limit_offset < settings.max_block_size) + { + settings.max_block_size = limit_length + limit_offset; + settings.max_threads = 1; + settings.asynchronous = false; + } - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - query_analyzer->makeSetsForIndex(); + query_analyzer->makeSetsForIndex(); - /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? - if (!interpreter_subquery) - { - /** При распределённой обработке запроса, на все удалённые серверы отправляются временные таблицы, - * полученные из глобальных подзапросов - GLOBAL IN/JOIN. - */ - if (storage && storage->isRemote()) - storage->storeExternalTables(query_analyzer->getExternalTables()); + /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? + if (!interpreter_subquery) + { + /** При распределённой обработке запроса, на все удалённые серверы отправляются временные таблицы, + * полученные из глобальных подзапросов - GLOBAL IN/JOIN. + */ + if (storage && storage->isRemote()) + storage->storeExternalTables(query_analyzer->getExternalTables()); streams = storage->read(required_columns, query_ptr, - context, settings_for_storage, from_stage, - settings.max_block_size, settings.max_threads); + context, settings_for_storage, from_stage, + settings.max_block_size, settings.max_threads); - for (auto & stream : streams) - stream->addTableLock(table_lock); - } - else - { - streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), settings.asynchronous)); - } + for (auto & stream : streams) + stream->addTableLock(table_lock); + } + else + { + streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), settings.asynchronous)); + } - /** Если истчоников слишком много, то склеим их в max_threads источников. - * (Иначе действия в каждом маленьком источнике, а затем объединение состояний, слишком неэффективно.) - */ - if (streams.size() > settings.max_threads) - streams = narrowBlockInputStreams(streams, settings.max_threads); + /** Если истчоников слишком много, то склеим их в max_threads источников. + * (Иначе действия в каждом маленьком источнике, а затем объединение состояний, слишком неэффективно.) + */ + if (streams.size() > settings.max_threads) + streams = narrowBlockInputStreams(streams, settings.max_threads); - /** Установка ограничений и квоты на чтение данных, скорость и время выполнения запроса. - * Такие ограничения проверяются на сервере-инициаторе запроса, а не на удалённых серверах. - * Потому что сервер-инициатор имеет суммарные данные о выполнении запроса на всех серверах. - */ - if (storage && to_stage == QueryProcessingStage::Complete) - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.max_rows_to_read = settings.limits.max_rows_to_read; - limits.max_bytes_to_read = settings.limits.max_bytes_to_read; - limits.read_overflow_mode = settings.limits.read_overflow_mode; - limits.max_execution_time = settings.limits.max_execution_time; - limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode; - limits.min_execution_speed = settings.limits.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed; + /** Установка ограничений и квоты на чтение данных, скорость и время выполнения запроса. + * Такие ограничения проверяются на сервере-инициаторе запроса, а не на удалённых серверах. + * Потому что сервер-инициатор имеет суммарные данные о выполнении запроса на всех серверах. + */ + if (storage && to_stage == QueryProcessingStage::Complete) + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.max_rows_to_read = settings.limits.max_rows_to_read; + limits.max_bytes_to_read = settings.limits.max_bytes_to_read; + limits.read_overflow_mode = settings.limits.read_overflow_mode; + limits.max_execution_time = settings.limits.max_execution_time; + limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode; + limits.min_execution_speed = settings.limits.min_execution_speed; + limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed; - QuotaForIntervals & quota = context.getQuota(); + QuotaForIntervals & quota = context.getQuota(); - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) - { - stream->setLimits(limits); - stream->setQuota(quota); - } - } - } + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) + { + stream->setLimits(limits); + stream->setQuota(quota); + } + } + } - return from_stage; + return from_stage; } void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, ExpressionActionsPtr expression) { - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); - stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.where_expression->getColumnName()), is_async); - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); + stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.where_expression->getColumnName()), is_async); + } } void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionActionsPtr expression, bool overflow_row, bool final) { - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); + } - BlockInputStreamPtr & stream = streams[0]; + BlockInputStreamPtr & stream = streams[0]; - Names key_names; - AggregateDescriptions aggregates; - query_analyzer->getAggregateInfo(key_names, aggregates); + Names key_names; + AggregateDescriptions aggregates; + query_analyzer->getAggregateInfo(key_names, aggregates); - bool separate_totals = to_stage > QueryProcessingStage::WithMergeableState; + bool separate_totals = to_stage > QueryProcessingStage::WithMergeableState; - /// Если источников несколько, то выполняем параллельную агрегацию - if (streams.size() > 1) - { - if (!settings.use_splitting_aggregator || key_names.empty()) - { - stream = maybeAsynchronous( - new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, overflow_row, final, - settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); - } - else - { - if (overflow_row) - throw Exception("Splitting aggregator cannot handle queries like this yet. " - "Please change use_splitting_aggregator, remove WITH TOTALS, " - "change group_by_overflow_mode or set totals_mode to AFTER_HAVING_EXCLUSIVE.", - ErrorCodes::NOT_IMPLEMENTED); - stream = maybeAsynchronous( - new SplittingAggregatingBlockInputStream( - new UnionBlockInputStream(streams, settings.max_threads), - key_names, aggregates, settings.max_threads, query.group_by_with_totals, separate_totals, final, - settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); - } + /// Если источников несколько, то выполняем параллельную агрегацию + if (streams.size() > 1) + { + if (!settings.use_splitting_aggregator || key_names.empty()) + { + stream = maybeAsynchronous( + new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, overflow_row, final, + settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); + } + else + { + if (overflow_row) + throw Exception("Splitting aggregator cannot handle queries like this yet. " + "Please change use_splitting_aggregator, remove WITH TOTALS, " + "change group_by_overflow_mode or set totals_mode to AFTER_HAVING_EXCLUSIVE.", + ErrorCodes::NOT_IMPLEMENTED); + stream = maybeAsynchronous( + new SplittingAggregatingBlockInputStream( + new UnionBlockInputStream(streams, settings.max_threads), + key_names, aggregates, settings.max_threads, query.group_by_with_totals, separate_totals, final, + settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); + } - streams.resize(1); - } - else - stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, overflow_row, final, - settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); + streams.resize(1); + } + else + stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, overflow_row, final, + settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); } void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams, bool overflow_row, bool final) { - /// Если объединять нечего - if (streams.size() == 1) - return; + /// Если объединять нечего + if (streams.size() == 1) + return; - /// Склеим несколько источников в один - streams[0] = new UnionBlockInputStream(streams, settings.max_threads); - streams.resize(1); + /// Склеим несколько источников в один + streams[0] = new UnionBlockInputStream(streams, settings.max_threads); + streams.resize(1); - /// Теперь объединим агрегированные блоки - Names key_names; - AggregateDescriptions aggregates; - query_analyzer->getAggregateInfo(key_names, aggregates); - streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final), settings.asynchronous); + /// Теперь объединим агрегированные блоки + Names key_names; + AggregateDescriptions aggregates; + query_analyzer->getAggregateInfo(key_names, aggregates); + streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final), settings.asynchronous); } void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, ExpressionActionsPtr expression) { - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); - stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.having_expression->getColumnName()), is_async); - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); + stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.having_expression->getColumnName()), is_async); + } } void InterpreterSelectQuery::executeTotalsAndHaving(BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression, bool overflow_row) { - if (streams.size() > 1) - { - streams[0] = new UnionBlockInputStream(streams, settings.max_threads); - streams.resize(1); - } + if (streams.size() > 1) + { + streams[0] = new UnionBlockInputStream(streams, settings.max_threads); + streams.resize(1); + } - Names key_names; - AggregateDescriptions aggregates; - query_analyzer->getAggregateInfo(key_names, aggregates); - streams[0] = maybeAsynchronous(new TotalsHavingBlockInputStream( - streams[0], key_names, aggregates, overflow_row, expression, - has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold), - settings.asynchronous); + Names key_names; + AggregateDescriptions aggregates; + query_analyzer->getAggregateInfo(key_names, aggregates); + streams[0] = maybeAsynchronous(new TotalsHavingBlockInputStream( + streams[0], key_names, aggregates, overflow_row, expression, + has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold), + settings.asynchronous); } void InterpreterSelectQuery::executeExpression(BlockInputStreams & streams, ExpressionActionsPtr expression) { - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); + } } void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams) { - SortDescription order_descr; - order_descr.reserve(query.order_expression_list->children.size()); - for (ASTs::iterator it = query.order_expression_list->children.begin(); - it != query.order_expression_list->children.end(); - ++it) - { - String name = (*it)->children.front()->getColumnName(); - order_descr.push_back(SortColumnDescription(name, typeid_cast(**it).direction)); - } + SortDescription order_descr; + order_descr.reserve(query.order_expression_list->children.size()); + for (ASTs::iterator it = query.order_expression_list->children.begin(); + it != query.order_expression_list->children.end(); + ++it) + { + String name = (*it)->children.front()->getColumnName(); + order_descr.push_back(SortColumnDescription(name, typeid_cast(**it).direction)); + } - /// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку. - size_t limit = 0; - if (!query.distinct) - { - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); - limit = limit_length + limit_offset; - } + /// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку. + size_t limit = 0; + if (!query.distinct) + { + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); + limit = limit_length + limit_offset; + } - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - IProfilingBlockInputStream * sorting_stream = new PartialSortingBlockInputStream(stream, order_descr, limit); + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + IProfilingBlockInputStream * sorting_stream = new PartialSortingBlockInputStream(stream, order_descr, limit); - /// Ограничения на сортировку - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.max_rows_to_read = settings.limits.max_rows_to_sort; - limits.max_bytes_to_read = settings.limits.max_bytes_to_sort; - limits.read_overflow_mode = settings.limits.sort_overflow_mode; - sorting_stream->setLimits(limits); + /// Ограничения на сортировку + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.max_rows_to_read = settings.limits.max_rows_to_sort; + limits.max_bytes_to_read = settings.limits.max_bytes_to_sort; + limits.read_overflow_mode = settings.limits.sort_overflow_mode; + sorting_stream->setLimits(limits); - stream = maybeAsynchronous(sorting_stream, is_async); - } + stream = maybeAsynchronous(sorting_stream, is_async); + } - BlockInputStreamPtr & stream = streams[0]; + BlockInputStreamPtr & stream = streams[0]; - /// Если потоков несколько, то объединяем их в один - if (streams.size() > 1) - { - stream = new UnionBlockInputStream(streams, settings.max_threads); - streams.resize(1); - } + /// Если потоков несколько, то объединяем их в один + if (streams.size() > 1) + { + stream = new UnionBlockInputStream(streams, settings.max_threads); + streams.resize(1); + } - /// Сливаем сортированные блоки TODO: таймаут на слияние. - stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr, limit), is_async); + /// Сливаем сортированные блоки TODO: таймаут на слияние. + stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr, limit), is_async); } void InterpreterSelectQuery::executeProjection(BlockInputStreams & streams, ExpressionActionsPtr expression) { - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression), is_async); + } } void InterpreterSelectQuery::executeDistinct(BlockInputStreams & streams, bool before_order, Names columns) { - if (query.distinct) - { - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); + if (query.distinct) + { + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); - size_t limit_for_distinct = 0; + size_t limit_for_distinct = 0; - /// Если после этой стадии DISTINCT не будет выполняться ORDER BY, то можно достать не более limit_length + limit_offset различных строк. - if (!query.order_expression_list || !before_order) - limit_for_distinct = limit_length + limit_offset; + /// Если после этой стадии DISTINCT не будет выполняться ORDER BY, то можно достать не более limit_length + limit_offset различных строк. + if (!query.order_expression_list || !before_order) + limit_for_distinct = limit_length + limit_offset; - bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = maybeAsynchronous(new DistinctBlockInputStream( - stream, settings.limits, limit_for_distinct, columns), is_async); - } - } + bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = maybeAsynchronous(new DistinctBlockInputStream( + stream, settings.limits, limit_for_distinct, columns), is_async); + } + } } void InterpreterSelectQuery::executeUnion(BlockInputStreams & streams) { - /// Если до сих пор есть несколько потоков, то объединяем их в один - if (streams.size() > 1) - { - streams[0] = new UnionBlockInputStream(streams, settings.max_threads); - streams.resize(1); - } + /// Если до сих пор есть несколько потоков, то объединяем их в один + if (streams.size() > 1) + { + streams[0] = new UnionBlockInputStream(streams, settings.max_threads); + streams.resize(1); + } } /// Предварительный LIMIT - применяется в каждом источнике, если источников несколько, до их объединения. void InterpreterSelectQuery::executePreLimit(BlockInputStreams & streams) { - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); - /// Если есть LIMIT - if (query.limit_length) - { - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) - { - BlockInputStreamPtr & stream = *it; - stream = new LimitBlockInputStream(stream, limit_length + limit_offset, 0); - } - } + /// Если есть LIMIT + if (query.limit_length) + { + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + { + BlockInputStreamPtr & stream = *it; + stream = new LimitBlockInputStream(stream, limit_length + limit_offset, 0); + } + } } void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams) { - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); - /// Если есть LIMIT - if (query.limit_length) - { - BlockInputStreamPtr & stream = streams[0]; - stream = new LimitBlockInputStream(stream, limit_length, limit_offset); - } + /// Если есть LIMIT + if (query.limit_length) + { + BlockInputStreamPtr & stream = streams[0]; + stream = new LimitBlockInputStream(stream, limit_length, limit_offset); + } } void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets) { - /// Если запрос не распределённый, то удалим создание временных таблиц из подзапросов (предназначавшихся для отправки на удалённые серверы). - if (!(storage && storage->isRemote())) - for (auto & elem : subqueries_for_sets) - elem.second.table.reset(); + /// Если запрос не распределённый, то удалим создание временных таблиц из подзапросов (предназначавшихся для отправки на удалённые серверы). + if (!(storage && storage->isRemote())) + for (auto & elem : subqueries_for_sets) + elem.second.table.reset(); - streams[0] = new CreatingSetsBlockInputStream(streams[0], subqueries_for_sets, settings.limits); + streams[0] = new CreatingSetsBlockInputStream(streams[0], subqueries_for_sets, settings.limits); } BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf) { - Block sample = getSampleBlock(); - String format_name = query.format ? typeid_cast(*query.format).name : context.getDefaultFormat(); + Block sample = getSampleBlock(); + String format_name = query.format ? typeid_cast(*query.format).name : context.getDefaultFormat(); - BlockInputStreamPtr in = execute(); - BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample); + BlockInputStreamPtr in = execute(); + BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample); - copyData(*in, *out); + copyData(*in, *out); - return in; + return in; } diff --git a/dbms/tests/queries/0_stateless/00098_7_union_all.reference b/dbms/tests/queries/0_stateless/00098_7_union_all.reference new file mode 100644 index 00000000000..1191247b6d9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00098_7_union_all.reference @@ -0,0 +1,2 @@ +1 +2 diff --git a/dbms/tests/queries/0_stateless/00098_7_union_all.sql b/dbms/tests/queries/0_stateless/00098_7_union_all.sql new file mode 100644 index 00000000000..2798b1a28e6 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00098_7_union_all.sql @@ -0,0 +1 @@ +SELECT DomainID FROM (SELECT 1 AS DomainID, 'abc' AS Domain UNION ALL SELECT 2 AS DomainID, 'def' AS Domain) ORDER BY DomainID ASC diff --git a/dbms/tests/queries/0_stateless/00098_8_union_all.reference b/dbms/tests/queries/0_stateless/00098_8_union_all.reference new file mode 100644 index 00000000000..1191247b6d9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00098_8_union_all.reference @@ -0,0 +1,2 @@ +1 +2 diff --git a/dbms/tests/queries/0_stateless/00098_8_union_all.sql b/dbms/tests/queries/0_stateless/00098_8_union_all.sql new file mode 100644 index 00000000000..86e88516f97 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00098_8_union_all.sql @@ -0,0 +1 @@ +SELECT DomainID FROM (SELECT DISTINCT 1 AS DomainID, 'abc' AS Domain UNION ALL SELECT 2 AS DomainID, 'def' AS Domain) ORDER BY DomainID ASC