mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
Add optimization in InterpreterSelectQuery that tries to minimize quantity of stream mergings.
This commit is contained in:
parent
12c844a86a
commit
24548d95b9
@ -91,7 +91,9 @@ private:
|
||||
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
|
||||
|
||||
/// Выполнить один запрос SELECT из цепочки UNION ALL.
|
||||
BlockInputStreamPtr executeSingleQuery();
|
||||
void executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all = false);
|
||||
|
||||
void updateProfilingAndLimits(BlockInputStreams & query_streams);
|
||||
|
||||
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
|
||||
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
|
||||
|
@ -29,6 +29,8 @@
|
||||
|
||||
#include <DB/Core/Field.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -206,14 +208,17 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
|
||||
: in;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
{
|
||||
BlockInputStreamPtr first_query_plan = executeSingleQuery();
|
||||
|
||||
{
|
||||
if (is_union_all_head && !query.next_union_all.isNull())
|
||||
{
|
||||
BlockInputStreams streams;
|
||||
streams.push_back(first_query_plan);
|
||||
{
|
||||
BlockInputStreams query_streams;
|
||||
executeSingleQuery(query_streams, true);
|
||||
streams.reserve(streams.size() + query_streams.size());
|
||||
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams));
|
||||
}
|
||||
|
||||
// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL
|
||||
// имеются только деревья типа SELECT.
|
||||
@ -221,16 +226,27 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
{
|
||||
Context select_query_context = context;
|
||||
InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false);
|
||||
streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous));
|
||||
BlockInputStreams query_streams;
|
||||
interpreter.executeSingleQuery(query_streams, true);
|
||||
streams.reserve(streams.size() + query_streams.size());
|
||||
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams));
|
||||
}
|
||||
|
||||
return new UnionBlockInputStream(streams, settings.max_threads);
|
||||
|
||||
executeUnion(streams);
|
||||
updateProfilingAndLimits(streams);
|
||||
}
|
||||
else
|
||||
return first_query_plan;
|
||||
{
|
||||
BlockInputStreams query_streams;
|
||||
executeSingleQuery(query_streams, false);
|
||||
streams.push_back(query_streams[0]);
|
||||
}
|
||||
|
||||
return streams[0];
|
||||
}
|
||||
|
||||
BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
|
||||
void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all)
|
||||
{
|
||||
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
|
||||
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
|
||||
@ -242,18 +258,24 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
* затем выполним остальные операции с одним получившимся потоком.
|
||||
*/
|
||||
|
||||
bool do_execute_union = !is_inside_union_all;
|
||||
|
||||
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
|
||||
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
|
||||
QueryProcessingStage::Enum from_stage = executeFetchColumns(query_streams);
|
||||
|
||||
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
|
||||
|
||||
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
|
||||
if (!subqueries_for_sets.empty())
|
||||
do_execute_union = true;
|
||||
|
||||
if (to_stage > QueryProcessingStage::FetchColumns)
|
||||
{
|
||||
bool has_where = false;
|
||||
bool need_aggregate = false;
|
||||
bool has_having = false;
|
||||
bool has_order_by = false;
|
||||
|
||||
|
||||
ExpressionActionsPtr array_join;
|
||||
ExpressionActionsPtr before_where;
|
||||
ExpressionActionsPtr before_aggregation;
|
||||
@ -327,8 +349,11 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
* чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов).
|
||||
* Иначе мог бы вернуться пустой результат на некорректный запрос.
|
||||
*/
|
||||
if (streams.empty())
|
||||
return new NullBlockInputStream;
|
||||
if (query_streams.empty())
|
||||
{
|
||||
query_streams.push_back(new NullBlockInputStream);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
|
||||
if (has_having)
|
||||
@ -352,14 +377,14 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
if (first_stage)
|
||||
{
|
||||
if (has_where)
|
||||
executeWhere(streams, before_where);
|
||||
executeWhere(query_streams, before_where);
|
||||
|
||||
if (need_aggregate)
|
||||
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
|
||||
executeAggregation(query_streams, before_aggregation, aggregate_overflow_row, aggregate_final);
|
||||
else
|
||||
{
|
||||
executeExpression(streams, before_order_and_select);
|
||||
executeDistinct(streams, true, selected_columns);
|
||||
executeExpression(query_streams, before_order_and_select);
|
||||
executeDistinct(query_streams, true, selected_columns);
|
||||
}
|
||||
|
||||
/** Оптимизация - при распределённой обработке запроса,
|
||||
@ -370,7 +395,7 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
&& !need_aggregate && !has_having && !has_order_by
|
||||
&& query.limit_length)
|
||||
{
|
||||
executePreLimit(streams);
|
||||
executePreLimit(query_streams);
|
||||
}
|
||||
}
|
||||
|
||||
@ -382,62 +407,81 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
{
|
||||
/// Если нужно объединить агрегированные результаты с нескольких серверов
|
||||
if (!first_stage)
|
||||
executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final);
|
||||
executeMergeAggregated(query_streams, aggregate_overflow_row, aggregate_final);
|
||||
|
||||
if (!aggregate_final)
|
||||
executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row);
|
||||
executeTotalsAndHaving(query_streams, has_having, before_having, aggregate_overflow_row);
|
||||
else if (has_having)
|
||||
executeHaving(streams, before_having);
|
||||
executeHaving(query_streams, before_having);
|
||||
|
||||
executeExpression(streams, before_order_and_select);
|
||||
executeDistinct(streams, true, selected_columns);
|
||||
executeExpression(query_streams, before_order_and_select);
|
||||
executeDistinct(query_streams, true, selected_columns);
|
||||
|
||||
need_second_distinct_pass = streams.size() > 1;
|
||||
need_second_distinct_pass = query_streams.size() > 1;
|
||||
}
|
||||
else if (query.group_by_with_totals && !aggregate_final)
|
||||
{
|
||||
executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row);
|
||||
executeTotalsAndHaving(query_streams, false, nullptr, aggregate_overflow_row);
|
||||
}
|
||||
|
||||
if (has_order_by)
|
||||
executeOrder(streams);
|
||||
executeOrder(query_streams);
|
||||
|
||||
executeProjection(streams, final_projection);
|
||||
executeProjection(query_streams, final_projection);
|
||||
|
||||
/// На этой стадии можно считать минимумы и максимумы, если надо.
|
||||
if (settings.extremes)
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
for (BlockInputStreams::iterator it = query_streams.begin(); it != query_streams.end(); ++it)
|
||||
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
|
||||
stream->enableExtremes();
|
||||
|
||||
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
|
||||
* ограничивающий число записей в каждом до offset + limit.
|
||||
*/
|
||||
if (query.limit_length && streams.size() > 1 && !query.distinct)
|
||||
executePreLimit(streams);
|
||||
|
||||
executeUnion(streams);
|
||||
|
||||
bool performed_pre_limit = false;
|
||||
if (query.limit_length && query_streams.size() > 1 && !query.distinct)
|
||||
{
|
||||
executePreLimit(query_streams);
|
||||
performed_pre_limit = true;
|
||||
}
|
||||
|
||||
if (need_aggregate || has_order_by || need_second_distinct_pass || performed_pre_limit)
|
||||
do_execute_union = true;
|
||||
|
||||
if (do_execute_union)
|
||||
executeUnion(query_streams);
|
||||
|
||||
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
|
||||
if (need_second_distinct_pass)
|
||||
executeDistinct(streams, false, Names());
|
||||
executeDistinct(query_streams, false, Names());
|
||||
|
||||
executeLimit(streams);
|
||||
executeLimit(query_streams);
|
||||
}
|
||||
}
|
||||
|
||||
/** Если данных нет. */
|
||||
if (streams.empty())
|
||||
return new NullBlockInputStream;
|
||||
if (query_streams.empty())
|
||||
{
|
||||
query_streams.push_back(new NullBlockInputStream);
|
||||
return;
|
||||
}
|
||||
|
||||
executeUnion(streams);
|
||||
if (do_execute_union)
|
||||
executeUnion(query_streams);
|
||||
|
||||
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
|
||||
//SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
|
||||
if (!subqueries_for_sets.empty())
|
||||
executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets);
|
||||
executeSubqueriesInSetsAndJoins(query_streams, subqueries_for_sets);
|
||||
|
||||
if (do_execute_union)
|
||||
updateProfilingAndLimits(query_streams);
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::updateProfilingAndLimits(BlockInputStreams & query_streams)
|
||||
{
|
||||
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
|
||||
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
|
||||
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*query_streams[0]))
|
||||
{
|
||||
stream->setProgressCallback(context.getProgressCallback());
|
||||
stream->setProcessListElement(context.getProcessListElement());
|
||||
@ -455,8 +499,6 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
stream->setQuota(context.getQuota());
|
||||
}
|
||||
}
|
||||
|
||||
return streams[0];
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user