Add fix: temporarily turn off UNION ALL support for SELECT queries because it breaks functional tests. [#METR-14099]

This commit is contained in:
Alexey Arno 2014-12-16 15:06:51 +03:00
parent 458044959e
commit 8cdc779c9c
2 changed files with 50 additions and 104 deletions

View File

@ -89,9 +89,6 @@ private:
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(BlockInputStreams & results, bool is_inside_union_all = false);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,

View File

@ -210,50 +210,6 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
if (is_union_all_head && !query.next_union_all.isNull())
{
executeSingleQuery(streams, true);
// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL
// имеются только деревья типа SELECT.
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery &>(*tree)).next_union_all)
{
Context select_query_context = context;
InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false);
interpreter.executeSingleQuery(streams, true);
}
executeUnion(streams);
}
else
executeSingleQuery(streams, false);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
return streams[0];
}
void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, bool is_inside_union_all)
{
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
@ -266,19 +222,12 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
* Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT,
* то объединение источников данных выполняется не на этом уровне, а на верхнем уровне.
*/
BlockInputStreams query_streams;
bool do_execute_union = !is_inside_union_all;
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(query_streams);
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
do_execute_union = true;
if (to_stage > QueryProcessingStage::FetchColumns)
{
bool has_where = false;
@ -359,11 +308,8 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
* чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов).
* Иначе мог бы вернуться пустой результат на некорректный запрос.
*/
if (query_streams.empty())
{
results.push_back(new NullBlockInputStream);
return;
}
if (streams.empty())
return new NullBlockInputStream;
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
if (has_having)
@ -387,14 +333,14 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
if (first_stage)
{
if (has_where)
executeWhere(query_streams, before_where);
executeWhere(streams, before_where);
if (need_aggregate)
executeAggregation(query_streams, before_aggregation, aggregate_overflow_row, aggregate_final);
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(query_streams, before_order_and_select);
executeDistinct(query_streams, true, selected_columns);
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
}
/** Оптимизация - при распределённой обработке запроса,
@ -405,7 +351,7 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
&& !need_aggregate && !has_having && !has_order_by
&& query.limit_length)
{
executePreLimit(query_streams);
executePreLimit(streams);
}
}
@ -417,78 +363,81 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
{
/// Если нужно объединить агрегированные результаты с нескольких серверов
if (!first_stage)
executeMergeAggregated(query_streams, aggregate_overflow_row, aggregate_final);
executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
executeTotalsAndHaving(query_streams, has_having, before_having, aggregate_overflow_row);
executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(query_streams, before_having);
executeHaving(streams, before_having);
executeExpression(query_streams, before_order_and_select);
executeDistinct(query_streams, true, selected_columns);
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
need_second_distinct_pass = query_streams.size() > 1;
need_second_distinct_pass = streams.size() > 1;
}
else if (query.group_by_with_totals && !aggregate_final)
{
executeTotalsAndHaving(query_streams, false, nullptr, aggregate_overflow_row);
executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
executeOrder(query_streams);
executeOrder(streams);
executeProjection(query_streams, final_projection);
executeProjection(streams, final_projection);
/// На этой стадии можно считать минимумы и максимумы, если надо.
if (settings.extremes)
for (BlockInputStreams::iterator it = query_streams.begin(); it != query_streams.end(); ++it)
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
stream->enableExtremes();
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
bool performed_pre_limit = false;
if (query.limit_length && query_streams.size() > 1 && !query.distinct)
{
executePreLimit(query_streams);
performed_pre_limit = true;
}
if (need_aggregate || has_order_by || need_second_distinct_pass || performed_pre_limit)
do_execute_union = true;
if (query.limit_length && streams.size() > 1 && !query.distinct)
executePreLimit(streams);
if (do_execute_union)
executeUnion(query_streams);
executeUnion(streams);
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
if (need_second_distinct_pass)
executeDistinct(query_streams, false, Names());
executeDistinct(streams, false, Names());
executeLimit(query_streams);
executeLimit(streams);
}
}
/** Если данных нет. */
if (query_streams.empty())
{
results.push_back(new NullBlockInputStream);
return;
}
if (do_execute_union)
executeUnion(query_streams);
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(query_streams, subqueries_for_sets);
if (streams.empty())
return new NullBlockInputStream;
if (query_streams.size() > 1)
executeUnion(streams);
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
results.reserve(results.size() + query_streams.size());
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(results));
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
else
results.push_back(query_streams[0]);
return streams[0];
}