#include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH; } InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_, const Names & required_result_column_names) : options(options_), query_ptr(query_ptr_), context(std::make_shared(context_)), max_streams(context->getSettingsRef().max_threads) { const auto & ast = query_ptr->as(); size_t num_selects = ast.list_of_selects->children.size(); if (!num_selects) throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR); /// Initialize interpreters for each SELECT query. /// Note that we pass 'required_result_column_names' to first SELECT. /// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT, /// because names could be different. nested_interpreters.reserve(num_selects); std::vector required_result_column_names_for_other_selects(num_selects); if (!required_result_column_names.empty() && num_selects > 1) { /// Result header if there are no filtering by 'required_result_column_names'. /// We use it to determine positions of 'required_result_column_names' in SELECT clause. Block full_result_header = InterpreterSelectQuery( ast.list_of_selects->children.at(0), *context, options.copy().analyze().noModify()).getSampleBlock(); std::vector positions_of_required_result_columns(required_result_column_names.size()); for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num) positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]); for (size_t query_num = 1; query_num < num_selects; ++query_num) { Block full_result_header_for_current_select = InterpreterSelectQuery( ast.list_of_selects->children.at(query_num), *context, options.copy().analyze().noModify()).getSampleBlock(); if (full_result_header_for_current_select.columns() != full_result_header.columns()) throw Exception("Different number of columns in UNION ALL elements:\n" + full_result_header.dumpNames() + "\nand\n" + full_result_header_for_current_select.dumpNames() + "\n", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size()); for (const auto & pos : positions_of_required_result_columns) required_result_column_names_for_other_selects[query_num].push_back(full_result_header_for_current_select.getByPosition(pos).name); } } for (size_t query_num = 0; query_num < num_selects; ++query_num) { const Names & current_required_result_column_names = query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num]; nested_interpreters.emplace_back(std::make_unique( ast.list_of_selects->children.at(query_num), *context, options, current_required_result_column_names)); } /// Determine structure of the result. if (num_selects == 1) { result_header = nested_interpreters.front()->getSampleBlock(); } else { Blocks headers(num_selects); for (size_t query_num = 0; query_num < num_selects; ++query_num) headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); result_header = getCommonHeaderForUnion(headers); } /// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits. bool all_nested_ignore_limits = true; bool all_nested_ignore_quota = true; for (auto & interpreter : nested_interpreters) { if (!interpreter->ignoreLimits()) all_nested_ignore_limits = false; if (!interpreter->ignoreQuota()) all_nested_ignore_quota = false; } options.ignore_limits |= all_nested_ignore_limits; options.ignore_quota |= all_nested_ignore_quota; } Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers) { size_t num_selects = headers.size(); Block common_header = headers.front(); size_t num_columns = common_header.columns(); for (size_t query_num = 1; query_num < num_selects; ++query_num) { if (headers[query_num].columns() != num_columns) throw Exception("Different number of columns in UNION ALL elements:\n" + common_header.dumpNames() + "\nand\n" + headers[query_num].dumpNames() + "\n", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); } std::vector columns(num_selects); for (size_t column_num = 0; column_num < num_columns; ++column_num) { for (size_t i = 0; i < num_selects; ++i) columns[i] = &headers[i].getByPosition(column_num); ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num); result_elem = getLeastSuperColumn(columns); } return common_header; } InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default; Block InterpreterSelectWithUnionQuery::getSampleBlock() { return result_header; } Block InterpreterSelectWithUnionQuery::getSampleBlock( const ASTPtr & query_ptr, const Context & context) { auto & cache = context.getSampleBlockCache(); /// Using query string because query_ptr changes for every internal SELECT auto key = queryToString(query_ptr); if (cache.find(key) != cache.end()) { return cache[key]; } return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock(); } BlockIO InterpreterSelectWithUnionQuery::execute() { BlockIO res; QueryPipeline & main_pipeline = res.pipeline; std::vector pipelines; bool has_main_pipeline = false; Blocks headers; headers.reserve(nested_interpreters.size()); for (auto & interpreter : nested_interpreters) { if (!has_main_pipeline) { has_main_pipeline = true; main_pipeline = interpreter->execute().pipeline; headers.emplace_back(main_pipeline.getHeader()); } else { pipelines.emplace_back(interpreter->execute().pipeline); headers.emplace_back(pipelines.back().getHeader()); } } if (!has_main_pipeline) main_pipeline.init(Pipe(std::make_shared(getSampleBlock()))); if (!pipelines.empty()) { auto common_header = getCommonHeaderForUnion(headers); main_pipeline.unitePipelines(std::move(pipelines), common_header); // nested queries can force 1 thread (due to simplicity) // but in case of union this cannot be done. UInt64 max_threads = context->getSettingsRef().max_threads; main_pipeline.setMaxThreads(std::min(nested_interpreters.size(), max_threads)); } main_pipeline.addInterpreterContext(context); return res; } void InterpreterSelectWithUnionQuery::ignoreWithTotals() { for (auto & interpreter : nested_interpreters) interpreter->ignoreWithTotals(); } }