Add optimization in InterpreterSelectQuery that tries to minimize quantity of stream mergings.

This commit is contained in:
Alexey Arno 2014-12-15 15:00:52 +03:00
parent 72ca080f64
commit 53318492a0
2 changed files with 89 additions and 45 deletions

View File

@ -91,7 +91,9 @@ private:
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
/// Выполнить один запрос SELECT из цепочки UNION ALL.
BlockInputStreamPtr executeSingleQuery();
void executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all = false);
void updateProfilingAndLimits(BlockInputStreams & query_streams);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,

View File

@ -29,6 +29,8 @@
#include <DB/Core/Field.h>
#include <algorithm>
#include <iterator>
namespace DB
{
@ -206,14 +208,17 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
: in;
}
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
BlockInputStreamPtr first_query_plan = executeSingleQuery();
if (is_union_all_head && !query.next_union_all.isNull())
{
BlockInputStreams streams;
streams.push_back(first_query_plan);
{
BlockInputStreams query_streams;
executeSingleQuery(query_streams, true);
streams.reserve(streams.size() + query_streams.size());
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams));
}
// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL
// имеются только деревья типа SELECT.
@ -221,16 +226,27 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
{
Context select_query_context = context;
InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false);
streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous));
BlockInputStreams query_streams;
interpreter.executeSingleQuery(query_streams, true);
streams.reserve(streams.size() + query_streams.size());
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(streams));
}
return new UnionBlockInputStream(streams, settings.max_threads);
executeUnion(streams);
updateProfilingAndLimits(streams);
}
else
return first_query_plan;
{
BlockInputStreams query_streams;
executeSingleQuery(query_streams, false);
streams.push_back(query_streams[0]);
}
return streams[0];
}
BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & query_streams, bool is_inside_union_all)
{
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
@ -242,11 +258,17 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
* затем выполним остальные операции с одним получившимся потоком.
*/
bool do_execute_union = !is_inside_union_all;
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
QueryProcessingStage::Enum from_stage = executeFetchColumns(query_streams);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
do_execute_union = true;
if (to_stage > QueryProcessingStage::FetchColumns)
{
bool has_where = false;
@ -327,8 +349,11 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
* чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов).
* Иначе мог бы вернуться пустой результат на некорректный запрос.
*/
if (streams.empty())
return new NullBlockInputStream;
if (query_streams.empty())
{
query_streams.push_back(new NullBlockInputStream);
return;
}
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
if (has_having)
@ -352,14 +377,14 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
if (first_stage)
{
if (has_where)
executeWhere(streams, before_where);
executeWhere(query_streams, before_where);
if (need_aggregate)
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
executeAggregation(query_streams, before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
executeExpression(query_streams, before_order_and_select);
executeDistinct(query_streams, true, selected_columns);
}
/** Оптимизация - при распределённой обработке запроса,
@ -370,7 +395,7 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
&& !need_aggregate && !has_having && !has_order_by
&& query.limit_length)
{
executePreLimit(streams);
executePreLimit(query_streams);
}
}
@ -382,62 +407,81 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
{
/// Если нужно объединить агрегированные результаты с нескольких серверов
if (!first_stage)
executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final);
executeMergeAggregated(query_streams, aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row);
executeTotalsAndHaving(query_streams, has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(streams, before_having);
executeHaving(query_streams, before_having);
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
executeExpression(query_streams, before_order_and_select);
executeDistinct(query_streams, true, selected_columns);
need_second_distinct_pass = streams.size() > 1;
need_second_distinct_pass = query_streams.size() > 1;
}
else if (query.group_by_with_totals && !aggregate_final)
{
executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row);
executeTotalsAndHaving(query_streams, false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
executeOrder(streams);
executeOrder(query_streams);
executeProjection(streams, final_projection);
executeProjection(query_streams, final_projection);
/// На этой стадии можно считать минимумы и максимумы, если надо.
if (settings.extremes)
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
for (BlockInputStreams::iterator it = query_streams.begin(); it != query_streams.end(); ++it)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
stream->enableExtremes();
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
if (query.limit_length && streams.size() > 1 && !query.distinct)
executePreLimit(streams);
bool performed_pre_limit = false;
if (query.limit_length && query_streams.size() > 1 && !query.distinct)
{
executePreLimit(query_streams);
performed_pre_limit = true;
}
executeUnion(streams);
if (need_aggregate || has_order_by || need_second_distinct_pass || performed_pre_limit)
do_execute_union = true;
if (do_execute_union)
executeUnion(query_streams);
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
if (need_second_distinct_pass)
executeDistinct(streams, false, Names());
executeDistinct(query_streams, false, Names());
executeLimit(streams);
executeLimit(query_streams);
}
}
/** Если данных нет. */
if (streams.empty())
return new NullBlockInputStream;
if (query_streams.empty())
{
query_streams.push_back(new NullBlockInputStream);
return;
}
executeUnion(streams);
if (do_execute_union)
executeUnion(query_streams);
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
//SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets);
executeSubqueriesInSetsAndJoins(query_streams, subqueries_for_sets);
if (do_execute_union)
updateProfilingAndLimits(query_streams);
}
void InterpreterSelectQuery::updateProfilingAndLimits(BlockInputStreams & query_streams)
{
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*query_streams[0]))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
@ -455,8 +499,6 @@ BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
stream->setQuota(context.getQuota());
}
}
return streams[0];
}