diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index 2c6727620b9..c550d62154f 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -91,7 +91,9 @@ private: QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); /// Выполнить один запрос SELECT из цепочки UNION ALL. - BlockInputStreamPtr executeSingleQuery(); + void executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all = false); + + void updateProfilingAndLimits(BlockInputStreams & query_streams); void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 58406bd62d6..3c3aea8727b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -29,6 +29,8 @@ #include +#include +#include namespace DB { @@ -206,14 +208,17 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool : in; } + BlockInputStreamPtr InterpreterSelectQuery::execute() -{ - BlockInputStreamPtr first_query_plan = executeSingleQuery(); - +{ if (is_union_all_head && !query.next_union_all.isNull()) { - BlockInputStreams streams; - streams.push_back(first_query_plan); + { + BlockInputStreams query_streams; + executeSingleQuery(query_streams, true); + streams.reserve(streams.size() + query_streams.size()); + std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams)); + } // NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL // имеются только деревья типа SELECT. @@ -221,16 +226,27 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() { Context select_query_context = context; InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false); - streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous)); + BlockInputStreams query_streams; + interpreter.executeSingleQuery(query_streams, true); + streams.reserve(streams.size() + query_streams.size()); + std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams)); } - - return new UnionBlockInputStream(streams, settings.max_threads); + + executeUnion(streams); + updateProfilingAndLimits(streams); } else - return first_query_plan; + { + BlockInputStreams query_streams; + executeSingleQuery(query_streams, false); + streams.push_back(query_streams[0]); + } + + return streams[0]; } -BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() + +void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all) { /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем @@ -242,18 +258,24 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() * затем выполним остальные операции с одним получившимся потоком. */ + bool do_execute_union = !is_inside_union_all; + /** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */ - QueryProcessingStage::Enum from_stage = executeFetchColumns(streams); + QueryProcessingStage::Enum from_stage = executeFetchColumns(query_streams); LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); + SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); + if (!subqueries_for_sets.empty()) + do_execute_union = true; + if (to_stage > QueryProcessingStage::FetchColumns) { bool has_where = false; bool need_aggregate = false; bool has_having = false; bool has_order_by = false; - + ExpressionActionsPtr array_join; ExpressionActionsPtr before_where; ExpressionActionsPtr before_aggregation; @@ -327,8 +349,11 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() * чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов). * Иначе мог бы вернуться пустой результат на некорректный запрос. */ - if (streams.empty()) - return new NullBlockInputStream; + if (query_streams.empty()) + { + query_streams.push_back(new NullBlockInputStream); + return; + } /// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации). if (has_having) @@ -352,14 +377,14 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() if (first_stage) { if (has_where) - executeWhere(streams, before_where); + executeWhere(query_streams, before_where); if (need_aggregate) - executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final); + executeAggregation(query_streams, before_aggregation, aggregate_overflow_row, aggregate_final); else { - executeExpression(streams, before_order_and_select); - executeDistinct(streams, true, selected_columns); + executeExpression(query_streams, before_order_and_select); + executeDistinct(query_streams, true, selected_columns); } /** Оптимизация - при распределённой обработке запроса, @@ -370,7 +395,7 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() && !need_aggregate && !has_having && !has_order_by && query.limit_length) { - executePreLimit(streams); + executePreLimit(query_streams); } } @@ -382,62 +407,81 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() { /// Если нужно объединить агрегированные результаты с нескольких серверов if (!first_stage) - executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final); + executeMergeAggregated(query_streams, aggregate_overflow_row, aggregate_final); if (!aggregate_final) - executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row); + executeTotalsAndHaving(query_streams, has_having, before_having, aggregate_overflow_row); else if (has_having) - executeHaving(streams, before_having); + executeHaving(query_streams, before_having); - executeExpression(streams, before_order_and_select); - executeDistinct(streams, true, selected_columns); + executeExpression(query_streams, before_order_and_select); + executeDistinct(query_streams, true, selected_columns); - need_second_distinct_pass = streams.size() > 1; + need_second_distinct_pass = query_streams.size() > 1; } else if (query.group_by_with_totals && !aggregate_final) { - executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row); + executeTotalsAndHaving(query_streams, false, nullptr, aggregate_overflow_row); } if (has_order_by) - executeOrder(streams); + executeOrder(query_streams); - executeProjection(streams, final_projection); + executeProjection(query_streams, final_projection); /// На этой стадии можно считать минимумы и максимумы, если надо. if (settings.extremes) - for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + for (BlockInputStreams::iterator it = query_streams.begin(); it != query_streams.end(); ++it) if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) stream->enableExtremes(); /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, * ограничивающий число записей в каждом до offset + limit. */ - if (query.limit_length && streams.size() > 1 && !query.distinct) - executePreLimit(streams); - - executeUnion(streams); - + bool performed_pre_limit = false; + if (query.limit_length && query_streams.size() > 1 && !query.distinct) + { + executePreLimit(query_streams); + performed_pre_limit = true; + } + + if (need_aggregate || has_order_by || need_second_distinct_pass || performed_pre_limit) + do_execute_union = true; + + if (do_execute_union) + executeUnion(query_streams); + /// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния. if (need_second_distinct_pass) - executeDistinct(streams, false, Names()); + executeDistinct(query_streams, false, Names()); - executeLimit(streams); + executeLimit(query_streams); } } /** Если данных нет. */ - if (streams.empty()) - return new NullBlockInputStream; + if (query_streams.empty()) + { + query_streams.push_back(new NullBlockInputStream); + return; + } - executeUnion(streams); + if (do_execute_union) + executeUnion(query_streams); - SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); + //SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); if (!subqueries_for_sets.empty()) - executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets); + executeSubqueriesInSetsAndJoins(query_streams, subqueries_for_sets); + + if (do_execute_union) + updateProfilingAndLimits(query_streams); +} + +void InterpreterSelectQuery::updateProfilingAndLimits(BlockInputStreams & query_streams) +{ /// Ограничения на результат, квота на результат, а также колбек для прогресса. - if (IProfilingBlockInputStream * stream = dynamic_cast(&*streams[0])) + if (IProfilingBlockInputStream * stream = dynamic_cast(&*query_streams[0])) { stream->setProgressCallback(context.getProgressCallback()); stream->setProcessListElement(context.getProcessListElement()); @@ -455,8 +499,6 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() stream->setQuota(context.getQuota()); } } - - return streams[0]; }