#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_) : query_ptr(query_ptr_), query(dynamic_cast(*query_ptr)), context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), log(&Logger::get("InterpreterSelectQuery")) { if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth) throw Exception("Too deep subqueries. Maximum: " + Poco::NumberFormatter::format(settings.limits.max_subquery_depth), ErrorCodes::TOO_DEEP_SUBQUERIES); } void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name) { /** Если таблица не указана - используем таблицу system.one. * Если база данных не указана - используем текущую базу данных. */ if (!query.table) { database_name = "system"; table_name = "one"; } else if (!query.database) database_name = context.getCurrentDatabase(); if (query.database) database_name = dynamic_cast(*query.database).name; if (query.table) table_name = dynamic_cast(*query.table).name; } StoragePtr InterpreterSelectQuery::getTable() { String database_name; String table_name; getDatabaseAndTableNames(database_name, table_name); return context.getTable(database_name, table_name); } ASTPtr InterpreterSelectQuery::getCreateQuery() { String database_name; String table_name; getDatabaseAndTableNames(database_name, table_name); return context.getCreateQuery(database_name, table_name); } void InterpreterSelectQuery::setColumns() { context.setColumns(!query.table || !dynamic_cast(&*query.table) ? getTable()->getColumnsList() : InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList()); if (context.getColumns().empty()) throw Exception("There is no available columns", ErrorCodes::THERE_IS_NO_COLUMN); } DataTypes InterpreterSelectQuery::getReturnTypes() { setColumns(); Expression expression(dynamic_cast(*query_ptr).select_expression_list, context); return expression.getReturnTypes(); } Block InterpreterSelectQuery::getSampleBlock() { setColumns(); Expression expression(dynamic_cast(*query_ptr).select_expression_list, context); return expression.getSampleBlock(); } /// Превращает источник в асинхронный, если это указано. static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool is_async) { return is_async ? new AsynchronousBlockInputStream(in) : in; } BlockInputStreamPtr InterpreterSelectQuery::execute() { /// Добавляем в контекст список доступных столбцов. setColumns(); /// Объект, с помощью которого анализируется запрос. ExpressionPtr expression = new Expression(query_ptr, context); /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем * если есть ORDER BY, то склеим потоки с помощью UnionBlockInputStream, а затем MergеSortingBlockInputStream, * если нет, то склеим с помощью UnionBlockInputStream, * затем применим LIMIT. * Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно; * параллельный GROUP BY склеит потоки в один, * затем выполним остальные операции с одним получившимся потоком. */ BlockInputStreams streams; /** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */ QueryProcessingStage::Enum from_stage = executeFetchColumns(streams, expression); /** Если данных нет. */ if (streams.empty()) return new NullBlockInputStream; LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); if (to_stage > QueryProcessingStage::FetchColumns) { /// Нужно ли агрегировать. bool need_aggregate = expression->hasAggregates() || query.group_expression_list; if (from_stage < QueryProcessingStage::WithMergeableState) { /// Вычислим подзапросы в секции IN. expression->makeSets(subquery_depth); /// А также скалярные подзапросы. expression->resolveScalarSubqueries(subquery_depth); executeArrayJoin(streams, expression); executeWhere(streams, expression); if (need_aggregate) executeAggregation(streams, expression); } else if (from_stage <= QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState) { expression->markBeforeAggregation(0, PART_AFTER_AGGREGATING); if (query.order_expression_list) setPartID(query.order_expression_list, PART_ORDER); if (query.having_expression) setPartID(query.having_expression, PART_HAVING); /// Выполним подзапросы в тех частях запроса, которые выполняются локально. /// Вычислим подзапросы в секции IN. expression->makeSets(subquery_depth, PART_AFTER_AGGREGATING | PART_HAVING | PART_ORDER); /// А также скалярные подзапросы. expression->resolveScalarSubqueries(subquery_depth, PART_AFTER_AGGREGATING | PART_HAVING | PART_ORDER); } if (from_stage <= QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState) { if (need_aggregate) { /// Если нужно объединить агрегированные результаты с нескольких серверов if (from_stage == QueryProcessingStage::WithMergeableState) executeMergeAggregated(streams, expression); executeFinalizeAggregates(streams, expression); } executeHaving(streams, expression); executeOuterExpression(streams, expression); executeOrder(streams, expression); /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, * ограничивающий число записей в каждом до offset + limit. */ if (query.limit_length && streams.size() > 1) executePreLimit(streams, expression); executeUnion(streams, expression); executeLimit(streams, expression); } } executeUnion(streams, expression); /// Ограничения на результат, а также колбек для прогресса. if (IProfilingBlockInputStream * stream = dynamic_cast(&*streams[0])) { IProfilingBlockInputStream::LocalLimits limits; limits.max_rows_to_read = settings.limits.max_result_rows; limits.max_bytes_to_read = settings.limits.max_result_bytes; limits.read_overflow_mode = settings.limits.result_overflow_mode; stream->setLimits(limits); stream->setProgressCallback(context.getProgressCallback()); } return streams[0]; } static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, size_t & offset) { length = 0; offset = 0; if (query.limit_length) { length = safeGet(dynamic_cast(*query.limit_length).value); if (query.limit_offset) offset = safeGet(dynamic_cast(*query.limit_offset).value); } } QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInputStreams & streams, ExpressionPtr & expression) { /// Таблица, откуда читать данные, если не подзапрос. StoragePtr table; /// Интерпретатор подзапроса, если подзапрос SharedPtr interpreter_subquery; /// Добавляем в контекст список доступных столбцов. setColumns(); if (!query.table || !dynamic_cast(&*query.table)) table = getTable(); else interpreter_subquery = new InterpreterSelectQuery(query.table, context, QueryProcessingStage::Complete, subquery_depth + 1); if (query.sample_size && (!table || !table->supportsSampling())) throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); if (query.final && (!table || !table->supportsFinal())) throw Exception("Illegal FINAL", ErrorCodes::ILLEGAL_FINAL); /** При распределённой обработке запроса, в потоках почти не делается вычислений, * а делается ожидание и получение данных с удалённых серверов. * Если у нас 20 удалённых серверов, а max_threads = 8, то было бы не очень хорошо * соединяться и опрашивать только по 8 серверов одновременно. * Чтобы одновременно опрашивалось больше удалённых серверов, * вместо max_threads используется max_distributed_connections. * * Сохраним изначальное значение max_threads в settings_for_storage * - эти настройки будут переданы на удалённые серверы при распределённой обработке запроса, * и там должно быть оригинальное значение max_threads, а не увеличенное. */ Settings settings_for_storage = settings; if (table && table->isRemote()) settings.max_threads = settings.max_distributed_connections; /// Список столбцов, которых нужно прочитать, чтобы выполнить запрос. Names required_columns = expression->getRequiredColumns(); /// Ограничение на количество столбцов для чтения. if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read) throw Exception("Limit for number of columns to read exceeded. " "Requested: " + Poco::NumberFormatter::format(required_columns.size()) + ", maximum: " + Poco::NumberFormatter::format(settings.limits.max_columns_to_read), ErrorCodes::TOO_MUCH_COLUMNS); /// Если не указан ни один столбец из таблицы, то будем читать первый попавшийся (чтобы хотя бы знать число строк). if (required_columns.empty()) required_columns.push_back(getAnyColumn()); size_t limit_length = 0; size_t limit_offset = 0; getLimitLengthAndOffset(query, limit_length, limit_offset); /** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size, * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено), * а также установим количество потоков в 1 и отменим асинхронное выполнение конвейера запроса. */ if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list && query.limit_length && !expression->hasAggregates() && limit_length + limit_offset < settings.max_block_size) { settings.max_block_size = limit_length + limit_offset; settings.max_threads = 1; settings.asynchronous = false; } QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? if (!query.table || !dynamic_cast(&*query.table)) streams = table->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads); else streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), settings.asynchronous)); /** Если истчоников слишком много, то склеим их в max_threads источников. * (Иначе действия в каждом маленьком источнике, а затем объединение состояний, слишком неэффективно.) */ if (streams.size() > settings.max_threads) streams = narrowBlockInputStreams(streams, settings.max_threads); /** Установка ограничений на чтение данных. * Они устанавливаются на самые "глубокие" чтения. * То есть, не должны устанавливаться для чтений из удалённых серверов и подзапросов. */ if (table && !table->isRemote()) { IProfilingBlockInputStream::LocalLimits limits; limits.max_rows_to_read = settings.limits.max_rows_to_read; limits.max_bytes_to_read = settings.limits.max_bytes_to_read; limits.read_overflow_mode = settings.limits.read_overflow_mode; limits.max_execution_time = settings.limits.max_execution_time; limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode; limits.min_execution_speed = settings.limits.min_execution_speed; limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) stream->setLimits(limits); } return from_stage; } void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления if (query.where_expression) { setPartID(query.where_expression, PART_WHERE); bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE, true), is_async); stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.where_expression->getColumnName()), is_async); } } } void InterpreterSelectQuery::executeArrayJoin(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если есть ARRAY JOIN - сначала выполним часть выражения, необходимую для его вычисления String array_join_column_name; if (expression->getArrayJoinInfo(array_join_column_name)) { expression->markBeforeArrayJoin(PART_BEFORE_ARRAY_JOIN); bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_BEFORE_ARRAY_JOIN, true), is_async); stream = maybeAsynchronous(new ArrayJoiningBlockInputStream(stream, array_join_column_name), is_async); } } } void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionPtr & expression) { expression->markBeforeAggregation(PART_BEFORE_AGGREGATING, 0); if (query.group_expression_list) setPartID(query.group_expression_list, PART_GROUP); bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING, true), is_async); } BlockInputStreamPtr & stream = streams[0]; Names key_names; AggregateDescriptions aggregates; expression->getAggregateInfo(key_names, aggregates); /// Если источников несколько, то выполняем параллельную агрегацию if (streams.size() > 1) { stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, query.group_by_with_totals, settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); streams.resize(1); } else stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, query.group_by_with_totals, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous); } void InterpreterSelectQuery::executeFinalizeAggregates(BlockInputStreams & streams, ExpressionPtr & expression) { /// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения BlockInputStreamPtr & stream = streams[0]; stream = maybeAsynchronous(new FinalizingAggregatedBlockInputStream(stream), settings.asynchronous); } void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если объединять нечего if (streams.size() == 1) return; /// Склеим несколько источников в один streams[0] = new UnionBlockInputStream(streams, settings.max_threads); streams.resize(1); /// Теперь объединим агрегированные блоки Names key_names; AggregateDescriptions aggregates; expression->getAggregateInfo(key_names, aggregates); streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, query.group_by_with_totals), settings.asynchronous); } void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если есть условие HAVING - сначала выполним часть выражения, необходимую для его вычисления if (query.having_expression) { setPartID(query.having_expression, PART_HAVING); bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING, true), is_async); stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.having_expression->getColumnName()), is_async); } } } void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams, ExpressionPtr & expression) { /// Выполним оставшуюся часть выражения setPartID(query.select_expression_list, PART_SELECT); if (query.order_expression_list) setPartID(query.order_expression_list, PART_ORDER); bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER, true), is_async); /** Оставим только столбцы, нужные для SELECT и ORDER BY части. * Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части. */ stream = new ProjectionBlockInputStream(stream, expression, query.order_expression_list ? true : false, PART_SELECT | PART_ORDER, query.order_expression_list ? NULL : query.select_expression_list); } } void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если есть ORDER BY if (query.order_expression_list) { SortDescription order_descr; order_descr.reserve(query.order_expression_list->children.size()); for (ASTs::iterator it = query.order_expression_list->children.begin(); it != query.order_expression_list->children.end(); ++it) { String name = (*it)->children.front()->getColumnName(); const ASTOrderByElement & ast_order_by = dynamic_cast(**it); order_descr.push_back(SortColumnDescription(name, ast_order_by.direction, ast_order_by.collator)); } bool is_async = settings.asynchronous && streams.size() <= settings.max_threads; for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; IProfilingBlockInputStream * sorting_stream = new PartialSortingBlockInputStream(stream, order_descr); /// Ограничения на сортировку IProfilingBlockInputStream::LocalLimits limits; limits.max_rows_to_read = settings.limits.max_rows_to_sort; limits.max_bytes_to_read = settings.limits.max_bytes_to_sort; limits.read_overflow_mode = settings.limits.sort_overflow_mode; sorting_stream->setLimits(limits); stream = maybeAsynchronous(sorting_stream, is_async); } BlockInputStreamPtr & stream = streams[0]; /// Если потоков несколько, то объединяем их в один if (streams.size() > 1) { stream = new UnionBlockInputStream(streams, settings.max_threads); streams.resize(1); } /// Сливаем сортированные блоки TODO: таймаут на слияние. stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr), is_async); /// Оставим только столбцы, нужные для SELECT части stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list); } } void InterpreterSelectQuery::executeUnion(BlockInputStreams & streams, ExpressionPtr & expression) { /// Если до сих пор есть несколько потоков, то объединяем их в один if (streams.size() > 1) { streams[0] = new UnionBlockInputStream(streams, settings.max_threads); streams.resize(1); } } /// Предварительный LIMIT - применяется в каждом источнике, если источников несколько, до их объединения. void InterpreterSelectQuery::executePreLimit(BlockInputStreams & streams, ExpressionPtr & expression) { size_t limit_length = 0; size_t limit_offset = 0; getLimitLengthAndOffset(query, limit_length, limit_offset); /// Если есть LIMIT if (query.limit_length) { for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) { BlockInputStreamPtr & stream = *it; stream = new LimitBlockInputStream(stream, limit_length + limit_offset, 0); } } } void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams, ExpressionPtr & expression) { size_t limit_length = 0; size_t limit_offset = 0; getLimitLengthAndOffset(query, limit_length, limit_offset); /// Если есть LIMIT if (query.limit_length) { BlockInputStreamPtr & stream = streams[0]; stream = new LimitBlockInputStream(stream, limit_length, limit_offset); } } BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf) { Block sample = getSampleBlock(); String format_name = query.format ? dynamic_cast(*query.format).name : "TabSeparated"; BlockInputStreamPtr in = execute(); BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample); copyData(*in, *out); return in; } void InterpreterSelectQuery::setPartID(ASTPtr ast, unsigned part_id) { ast->part_id |= part_id; for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) setPartID(*it, part_id); } String InterpreterSelectQuery::getAnyColumn() { NamesAndTypesList::const_iterator it = context.getColumns().begin(); size_t min_size = it->second->isNumeric() ? it->second->getSizeOfField() : 100; String res = it->first; for (; it != context.getColumns().end(); ++it) { size_t current_size = it->second->isNumeric() ? it->second->getSizeOfField() : 100; if (current_size < min_size) { min_size = current_size; res = it->first; } } return res; } }