diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index dace01ab37d..264aae5325b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -94,7 +95,7 @@ void InterpreterSelectQuery::init() auto table_expression = query.table(); /// Read from subquery. - if (table_expression && typeid_cast(table_expression.get())) + if (table_expression && typeid_cast(table_expression.get())) { source_header = InterpreterSelectQuery::getSampleBlock(table_expression, context); } @@ -564,7 +565,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline } auto query_table = query.table(); - if (query_table && typeid_cast(query_table.get())) + if (query_table && typeid_cast(query_table.get())) { /** There are no limits on the maximum size of the result for the subquery. * Since the result of the query is not the result of the entire query. @@ -1072,6 +1073,31 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline) } +bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query) +{ + if (query.group_by_with_totals) + return true; + + /** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard. + * In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end. + */ + + auto query_table = query.table(); + if (query_table) + { + auto ast_union = typeid_cast(query_table.get()); + if (ast_union) + { + for (const auto & elem : ast_union->list_of_selects->children) + if (hasWithTotalsInAnySubqueryInFromClause(typeid_cast(*elem))) + return true; + } + } + + return false; +} + + void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) { size_t limit_length = 0; @@ -1093,34 +1119,10 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) bool always_read_till_end = false; if (query.group_by_with_totals && !query.order_expression_list) - { always_read_till_end = true; - } - auto query_table = query.table(); - if (!query.group_by_with_totals && query_table && typeid_cast(query_table.get())) - { - const ASTSelectQuery * subquery = static_cast(query_table.get()); - - while (subquery->table()) - { - if (subquery->group_by_with_totals) - { - /** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard. - * In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end. - */ - - always_read_till_end = true; - break; - } - - auto subquery_table = subquery->table(); - if (typeid_cast(subquery_table.get())) - subquery = static_cast(subquery_table.get()); - else - break; - } - } + if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query)) + always_read_till_end = true; pipeline.transform([&](auto & stream) { diff --git a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 27397459276..b5c7afeca51 100644 --- a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -3,6 +3,9 @@ #include #include #include +#include +#include +#include #include @@ -12,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH; } @@ -35,6 +39,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( for (const auto & select : ast.list_of_selects->children) nested_interpreters.emplace_back(std::make_unique(select, context, to_stage, subquery_depth)); + + init(); } @@ -59,28 +65,88 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( for (const auto & select : ast.list_of_selects->children) nested_interpreters.emplace_back(std::make_unique(select, context, required_column_names, to_stage, subquery_depth)); + + init(); } InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default; +void InterpreterSelectWithUnionQuery::init() +{ + size_t num_selects = nested_interpreters.size(); + + if (!num_selects) + throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR); + + if (num_selects == 1) + { + result_header = nested_interpreters.front()->getSampleBlock(); + } + else + { + Blocks headers(num_selects); + for (size_t query_num = 0; query_num < num_selects; ++query_num) + headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); + + result_header = headers.front(); + size_t num_columns = result_header.columns(); + + for (size_t query_num = 1; query_num < num_selects; ++query_num) + if (headers[query_num].columns() != num_columns) + throw Exception("Different number of columns in UNION ALL elements", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); + + for (size_t column_num = 0; column_num < num_columns; ++column_num) + { + ColumnWithTypeAndName & result_elem = result_header.getByPosition(column_num); + + /// Determine common type. + + DataTypes types(num_selects); + for (size_t query_num = 0; query_num < num_selects; ++query_num) + types[query_num] = headers[query_num].getByPosition(column_num).type; + + result_elem.type = getLeastSupertype(types); + + /// If there are different constness or different values of constants, the result must be non-constant. + + if (result_elem.column->isColumnConst()) + { + bool need_materialize = false; + for (size_t query_num = 1; query_num < num_selects; ++query_num) + { + const ColumnWithTypeAndName & source_elem = headers[query_num].getByPosition(column_num); + + if (!source_elem.column->isColumnConst() + || (static_cast(*result_elem.column).getField() + != static_cast(*source_elem.column).getField())) + { + need_materialize = true; + break; + } + } + + if (need_materialize) + result_elem.column = result_elem.type->createColumn(); + } + + /// BTW, result column names are from first SELECT. + } + } +} + + Block InterpreterSelectWithUnionQuery::getSampleBlock() { - return nested_interpreters.front()->getSampleBlock(); + return result_header; } Block InterpreterSelectWithUnionQuery::getSampleBlock( const ASTPtr & query_ptr, const Context & context) { - const ASTSelectWithUnionQuery & ast = typeid_cast(*query_ptr); - - size_t num_selects = ast.list_of_selects->children.size(); - if (!num_selects) - throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR); - - return InterpreterSelectQuery::getSampleBlock(ast.list_of_selects->children.front(), context); + return InterpreterSelectWithUnionQuery(query_ptr, context).getSampleBlock(); } @@ -114,8 +180,12 @@ BlockIO InterpreterSelectWithUnionQuery::execute() } else { - const Settings & settings = context.getSettingsRef(); - result_stream = std::make_shared>(nested_streams, nullptr, settings.max_threads); + /// Unify data structure. + if (nested_interpreters.size() > 1) + for (auto & stream : nested_streams) + stream = std::make_shared(context, stream, result_header, ConvertingBlockInputStream::MatchColumnsMode::Position); + + result_stream = std::make_shared>(nested_streams, nullptr, context.getSettingsRef().max_threads); nested_streams.clear(); } diff --git a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h index 71a606e1bd4..c0346065804 100644 --- a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h @@ -51,6 +51,10 @@ private: size_t subquery_depth; std::vector> nested_interpreters; + + Block result_header; + + void init(); }; }