From b630f30a25ddc10505cc1f7eab5d32c031ca43cc Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 12 Dec 2014 16:26:17 +0300 Subject: [PATCH] Add initial support for SELECT requests chained by UNION ALL. --- dbms/include/DB/Core/Block.h | 2 + dbms/include/DB/Core/ErrorCodes.h | 1 + .../include/DB/DataTypes/typesAreCompatible.h | 10 +++ .../DB/Interpreters/InterpreterSelectQuery.h | 8 ++- dbms/src/Core/Block.cpp | 20 +++++- dbms/src/DataTypes/typesAreCompatible.cpp | 65 +++++++++++++++++++ .../Interpreters/InterpreterSelectQuery.cpp | 41 +++++++++++- dbms/src/Parsers/formatAST.cpp | 7 ++ 8 files changed, 150 insertions(+), 4 deletions(-) create mode 100644 dbms/include/DB/DataTypes/typesAreCompatible.h create mode 100644 dbms/src/DataTypes/typesAreCompatible.cpp diff --git a/dbms/include/DB/Core/Block.h b/dbms/include/DB/Core/Block.h index 66e5b4ec75f..a463aec38e5 100644 --- a/dbms/include/DB/Core/Block.h +++ b/dbms/include/DB/Core/Block.h @@ -125,6 +125,8 @@ typedef std::list BlocksList; /// Сравнить типы столбцов у блоков. Порядок столбцов имеет значение. Имена не имеют значения. bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs); +bool blocksHaveCompatibleStructure(const Block& lhs, const Block& rhs); + } namespace std diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 7d3a86bd64a..2fee1c7352e 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -265,6 +265,7 @@ namespace ErrorCodes TOO_MUCH_RETRIES_TO_FETCH_PARTS, PARTITION_ALREADY_EXISTS, PARTITION_DOESNT_EXIST, + UNION_ALL_INCOMPATIBLE_RESULTS, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/DataTypes/typesAreCompatible.h b/dbms/include/DB/DataTypes/typesAreCompatible.h new file mode 100644 index 00000000000..0f6d42de60f --- /dev/null +++ b/dbms/include/DB/DataTypes/typesAreCompatible.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +bool typesAreCompatible(const IDataType& lhs, const IDataType& rhs); + +} diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index 19aed2d53c8..7b8b06054cb 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -39,7 +39,8 @@ public: const Context & context_, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, size_t subquery_depth_ = 0, - BlockInputStreamPtr input = nullptr); + BlockInputStreamPtr input = nullptr, + bool is_union_all_head_ = true); InterpreterSelectQuery( ASTPtr query_ptr_, @@ -87,6 +88,8 @@ private: /// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage. QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); + BlockInputStreamPtr executeSingleQuery(); + void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, bool overflow_row, bool final); @@ -111,6 +114,9 @@ private: size_t subquery_depth; ExpressionAnalyzerPtr query_analyzer; BlockInputStreams streams; + + /// true, если это голова цепочки запросов SELECT объединённых ключевыми словами UNION ALL. + bool is_union_all_head; /// Таблица, откуда читать данные, если не подзапрос. StoragePtr storage; diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 0c1f49b21f4..4ca952711ae 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -363,7 +364,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs) { const IDataType & lhs_type = *lhs.getByPosition(i).type; const IDataType & rhs_type = *rhs.getByPosition(i).type; - + if (lhs_type.getName() != rhs_type.getName()) return false; } @@ -371,6 +372,23 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs) return true; } +bool blocksHaveCompatibleStructure(const Block & lhs, const Block & rhs) +{ + size_t columns = lhs.columns(); + if (rhs.columns() != columns) + return false; + + for (size_t i = 0; i < columns; ++i) + { + const auto & lhs_type = *lhs.getByPosition(i).type; + const auto & rhs_type = *rhs.getByPosition(i).type; + + if (!typesAreCompatible(lhs_type, rhs_type)) + return false; + } + + return true; +} void Block::clear() { diff --git a/dbms/src/DataTypes/typesAreCompatible.cpp b/dbms/src/DataTypes/typesAreCompatible.cpp new file mode 100644 index 00000000000..222bfdf1ab1 --- /dev/null +++ b/dbms/src/DataTypes/typesAreCompatible.cpp @@ -0,0 +1,65 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace +{ + +#define REGISTER_COMPATIBLE_TYPES(T, U) \ + { T.getName(), U.getName() }, \ + { U.getName(), T.getName() } \ + +std::vector > init() +{ + std::vector > types_desc = + { + REGISTER_COMPATIBLE_TYPES(DB::DataTypeString(), DB::DataTypeFixedString(1)), + + REGISTER_COMPATIBLE_TYPES(DB::DataTypeFloat32(), DB::DataTypeFloat64()), + + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt8()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt16()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt16()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt32()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt32()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt64()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt64()), + + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt16()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt32()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt32()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt64()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt64()), + + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt32()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeInt64()), + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt64()), + + REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt64(), DB::DataTypeUInt64()) + }; + std::sort(types_desc.begin(), types_desc.end()); + return types_desc; +} + +} + +namespace DB +{ + +bool typesAreCompatible(const IDataType & lhs, const IDataType & rhs) +{ + static const auto types_desc = init(); + + if (lhs.getName() == rhs.getName()) + return true; + + return std::binary_search(types_desc.begin(), types_desc.end(), std::make_pair(lhs.getName(), rhs.getName())); +} + +} diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 72df1cf1582..f29753fd1bd 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -85,12 +85,26 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType if (input_) streams.push_back(input_); + + if (is_union_all_head && (!query.next_union_all.isNull())) + { + // Проверить, что результаты всех запросов SELECT cовместимые. + Block previous = getSampleBlock(); + for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast(&*tree))->next_union_all) + { + Block current = InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false).getSampleBlock(); + if (!blocksHaveCompatibleStructure(previous, current)) + throw Exception("Incompatible results in the SELECT queries of the UNION ALL chain", ErrorCodes::UNION_ALL_INCOMPATIBLE_RESULTS); + previous = std::move(current); + } + } } InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_, - size_t subquery_depth_, BlockInputStreamPtr input_) + size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_) : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(is_union_all_head_), log(&Logger::get("InterpreterSelectQuery")) { init(input_); @@ -101,6 +115,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(true), log(&Logger::get("InterpreterSelectQuery")) { /** Оставляем в запросе в секции SELECT только нужные столбцы. @@ -117,6 +132,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_), + is_union_all_head(true), log(&Logger::get("InterpreterSelectQuery")) { /** Оставляем в запросе в секции SELECT только нужные столбцы. @@ -187,8 +203,29 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool : in; } - BlockInputStreamPtr InterpreterSelectQuery::execute() +{ + BlockInputStreamPtr first_query_plan = executeSingleQuery(); + + if (is_union_all_head && !query.next_union_all.isNull()) + { + BlockInputStreams streams; + streams.push_back(first_query_plan); + + for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast(&*tree))->next_union_all) + { + Context select_query_context = context; + InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false); + streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous)); + } + + return new UnionBlockInputStream(streams, settings.max_threads); + } + else + return first_query_plan; +} + +BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery() { /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем diff --git a/dbms/src/Parsers/formatAST.cpp b/dbms/src/Parsers/formatAST.cpp index 14ea3347c3d..d98c3b38012 100644 --- a/dbms/src/Parsers/formatAST.cpp +++ b/dbms/src/Parsers/formatAST.cpp @@ -238,6 +238,13 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "FORMAT " << (hilite ? hilite_none : ""); formatAST(*ast.format, s, indent, hilite, one_line); } + + if (!ast.next_union_all.isNull()) + { + s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "UNION ALL " << (hilite ? hilite_none : ""); + const ASTSelectQuery * next_ast = static_cast(&*ast.next_union_all); + formatAST(*next_ast, s, false, hilite, one_line, need_parens); + } } void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)