Code cleanup. [#METR-14099]

This commit is contained in:
Alexey Arno 2014-12-17 14:56:01 +03:00
parent 10b33bdc2e
commit bbdbc9c9d3
3 changed files with 781 additions and 777 deletions

View File

@ -78,6 +78,11 @@ private:
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList()); void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(bool should_perform_union_hint = true);
bool isFirstSelectInsideUnionAll() const;
/** Из какой таблицы читать. JOIN-ы не поддерживаются. /** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/ */
void getDatabaseAndTableNames(String & database_name, String & table_name); void getDatabaseAndTableNames(String & database_name, String & table_name);
@ -91,9 +96,6 @@ private:
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage. /// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(bool is_inside_union_all = false);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final); bool overflow_row, bool final);

View File

@ -30,9 +30,6 @@
#include <DB/Core/Field.h> #include <DB/Core/Field.h>
#include <algorithm>
#include <iterator>
namespace DB namespace DB
{ {
@ -89,11 +86,11 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
if (input_) if (input_)
streams.push_back(input_); streams.push_back(input_);
if (is_union_all_head && (!query.next_union_all.isNull())) if (isFirstSelectInsideUnionAll())
{ {
// Проверить, что результаты всех запросов SELECT cовместимые. // Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые.
// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, потому что знаем, что в цепочке UNION ALL // NOTE Мы можем безопасно применить static_cast вместо typeid_cast,
// имеются только деревья типа SELECT. // потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT.
InterpreterSelectQuery * interpreter = this; InterpreterSelectQuery * interpreter = this;
Block first = interpreter->getSampleBlock(); Block first = interpreter->getSampleBlock();
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery &>(*tree)).next_union_all) for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery &>(*tree)).next_union_all)
@ -152,6 +149,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
init(input_, table_column_names); init(input_, table_column_names);
} }
bool InterpreterSelectQuery::isFirstSelectInsideUnionAll() const
{
return is_union_all_head && !query.next_union_all.isNull();
}
void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name) void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name)
{ {
/** Если таблица не указана - используем таблицу system.one. /** Если таблица не указана - используем таблицу system.one.
@ -214,12 +216,12 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
BlockInputStreamPtr InterpreterSelectQuery::execute() BlockInputStreamPtr InterpreterSelectQuery::execute()
{ {
if (is_union_all_head && !query.next_union_all.isNull()) if (isFirstSelectInsideUnionAll())
{ {
executeSingleQuery(true); executeSingleQuery(false);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->query.next_select_in_union_all.get()) for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->query.next_select_in_union_all.get())
{ {
p->query.executeSingleQuery(true); p->query.executeSingleQuery(false);
const auto & others = p->query.streams; const auto & others = p->query.streams;
streams.insert(streams.end(), others.begin(), others.end()); streams.insert(streams.end(), others.begin(), others.end());
} }
@ -263,7 +265,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
} }
void InterpreterSelectQuery::executeSingleQuery(bool is_inside_union_all) void InterpreterSelectQuery::executeSingleQuery(bool should_perform_union_hint)
{ {
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
@ -277,7 +279,7 @@ void InterpreterSelectQuery::executeSingleQuery(bool is_inside_union_all)
* то объединение источников данных выполняется не на этом уровне, а на верхнем уровне. * то объединение источников данных выполняется не на этом уровне, а на верхнем уровне.
*/ */
bool do_execute_union = !is_inside_union_all; bool do_execute_union = should_perform_union_hint;
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */ /** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams); QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);

View File

@ -1 +1 @@
SELECT nn,vv FROM (SELECT name AS nn, value AS vv FROM data2013 UNION ALL SELECT name AS nn, value AS vv FROM data2014) ORDER BY nn ASC; SELECT nn,vv FROM (SELECT name AS nn, value AS vv FROM data2013 UNION ALL SELECT name AS nn, value AS vv FROM data2014) ORDER BY nn,vv ASC;