Code cleanups [#METR-14099].

This commit is contained in:
Alexey Arno 2014-12-15 17:57:42 +03:00
parent 9dd48cbf36
commit 819b3bd6ac
4 changed files with 32 additions and 36 deletions

View File

@ -265,7 +265,7 @@ namespace ErrorCodes
TOO_MUCH_RETRIES_TO_FETCH_PARTS,
PARTITION_ALREADY_EXISTS,
PARTITION_DOESNT_EXIST,
UNION_ALL_INCOMPATIBLE_RESULTS,
UNION_ALL_RESULT_STRUCTURES_MISMATCH,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -93,8 +93,6 @@ private:
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(BlockInputStreams & results, bool is_inside_union_all = false);
void updateProfilingAndLimits(BlockInputStreams & query_streams);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);

View File

@ -98,8 +98,8 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
{
Block current = InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false).getSampleBlock();
if (!blocksHaveCompatibleStructure(previous, current))
throw Exception("Incompatible results in the SELECT queries of the UNION ALL chain",
ErrorCodes::UNION_ALL_INCOMPATIBLE_RESULTS);
throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
previous = std::move(current);
}
}
@ -225,11 +225,30 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
}
executeUnion(streams);
updateProfilingAndLimits(streams);
}
else
executeSingleQuery(streams, false);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
return streams[0];
}
@ -244,6 +263,8 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
* Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно;
* параллельный GROUP BY склеит потоки в один,
* затем выполним остальные операции с одним получившимся потоком.
* Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT,
* то объединение источников данных выполняется не на этом уровне, а не верхнем уровне.
*/
BlockInputStreams query_streams;
@ -458,39 +479,16 @@ void InterpreterSelectQuery::executeSingleQuery(BlockInputStreams & results, boo
if (do_execute_union)
executeUnion(query_streams);
//SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(query_streams, subqueries_for_sets);
if (do_execute_union)
updateProfilingAndLimits(query_streams);
results.reserve(results.size() + query_streams.size());
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(results));
}
void InterpreterSelectQuery::updateProfilingAndLimits(BlockInputStreams & query_streams)
{
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*query_streams[0]))
if (query_streams.size() > 1)
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
results.reserve(results.size() + query_streams.size());
std::copy(query_streams.begin(), query_streams.end(), std::back_inserter(results));
}
else
results.push_back(query_streams[0]);
}

View File

@ -242,11 +242,11 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
if (!ast.next_union_all.isNull())
{
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "UNION ALL " << nl_or_ws << (hilite ? hilite_none : "");
// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL
// имеются только деревья типа SELECT.
const ASTSelectQuery & next_ast = static_cast<const ASTSelectQuery &>(*ast.next_union_all);
formatAST(next_ast, s, indent, hilite, one_line, need_parens);
}
}