Code cleanup. [#METR-14099]

This commit is contained in:
Alexey Arno 2014-12-17 14:56:01 +03:00
parent 5111c759d9
commit 839f09f9d0
3 changed files with 117 additions and 142 deletions

View File

@ -16,114 +16,139 @@ namespace DB
class InterpreterSelectQuery
{
public:
/** to_stage
* - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца.
* Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса.
*
* subquery_depth
* - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу.
*
* input
* - если задан - читать не из таблицы, указанной в запросе, а из готового источника.
*
* required_column_names
* - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов.
*
* table_column_names
* - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы.
* Используется, например, совместно с указанием input.
*/
/** to_stage
* - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца.
* Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса.
*
* subquery_depth
* - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу.
*
* input
* - если задан - читать не из таблицы, указанной в запросе, а из готового источника.
*
* required_column_names
* - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов.
*
* table_column_names
* - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы.
* Используется, например, совместно с указанием input.
*/
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr,
bool is_union_all_head_ = true);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr,
bool is_union_all_head_ = true);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names,
const NamesAndTypesList & table_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names,
const NamesAndTypesList & table_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
/** Выполнить запрос, возможно являющиийся цепочкой UNION ALL.
* Получить поток блоков для чтения
*/
BlockInputStreamPtr execute();
/** Выполнить запрос, возможно являющиийся цепочкой UNION ALL.
* Получить поток блоков для чтения
*/
BlockInputStreamPtr execute();
/** Выполнить запрос, записать результат в нужном формате в buf.
* BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса.
*/
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf);
/** Выполнить запрос, записать результат в нужном формате в buf.
* BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса.
*/
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf);
DataTypes getReturnTypes();
Block getSampleBlock();
DataTypes getReturnTypes();
Block getSampleBlock();
private:
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
/** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(bool should_perform_union_hint = true);
/** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера.
*/
String getAnyColumn();
bool isFirstSelectInsideUnionAll() const;
/// Разные стадии выполнения запроса.
/** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);
void executePreLimit( BlockInputStreams & streams);
void executeUnion( BlockInputStreams & streams);
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets);
/** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера.
*/
String getAnyColumn();
ASTPtr query_ptr;
ASTSelectQuery & query;
Context context;
Settings settings;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
/// Проверить, что запрос SELECT - первый элемент цепочки UNION ALL.
bool is_union_all_head;
/// Разные стадии выполнения запроса.
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr storage;
IStorage::TableStructureReadLockPtr table_lock;
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
Logger * log;
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);
void executePreLimit( BlockInputStreams & streams);
void executeUnion( BlockInputStreams & streams);
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets);
ASTPtr query_ptr;
ASTSelectQuery & query;
Context context;
Settings settings;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
/** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT)
* или больше. Этот флаг установлен, если это первый запрос, возможно единственный, этой цепочки.
*/
bool is_union_all_head;
/// Следующий запрос SELECT в цепочке UNION ALL.
std::unique_ptr<InterpreterSelectQueryWithContext> next_select_in_union_all;
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr storage;
IStorage::TableStructureReadLockPtr table_lock;
Logger * log;
};
struct InterpreterSelectQueryWithContext
{
InterpreterSelectQueryWithContext(
ASTPtr query_ptr_,
Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr,
bool is_union_all_head_ = true) : context(context_), query(query_ptr_, context, to_stage_, subquery_depth_, input, is_union_all_head_)
{
}
Context context;
InterpreterSelectQuery query;
};
}

View File

@ -215,7 +215,6 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
BlockInputStreamPtr InterpreterSelectQuery::execute()
<<<<<<< HEAD
{
if (isFirstSelectInsideUnionAll())
{
@ -263,55 +262,6 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
}
return streams[0];
=======
{
if (is_union_all_head && !query.next_union_all.isNull())
{
executeSingleQuery(true);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->query.next_select_in_union_all.get())
{
p->query.executeSingleQuery(true);
const auto & others = p->query.streams;
streams.insert(streams.end(), others.begin(), others.end());
}
if (streams.empty())
return new NullBlockInputStream;
for (auto & stream : streams)
stream = new MaterializingBlockInputStream(stream);
executeUnion(streams);
}
else
{
executeSingleQuery();
if (streams.empty())
return new NullBlockInputStream;
}
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
limits.max_bytes_to_read = settings.limits.max_result_bytes;
limits.read_overflow_mode = settings.limits.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
return streams[0];
>>>>>>> Update functional tests. Add materialization in UNION ALL queries. [#METR-14099]
}

View File

@ -1 +1 @@
SELECT nn,vv FROM (SELECT name AS nn, value AS vv FROM data2013 UNION ALL SELECT name AS nn, value AS vv FROM data2014) ORDER BY nn ASC;
SELECT nn,vv FROM (SELECT name AS nn, value AS vv FROM data2013 UNION ALL SELECT name AS nn, value AS vv FROM data2014) ORDER BY nn,vv ASC;