#include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH; extern const int LOGICAL_ERROR; } static Block getCommonHeader(const Blocks & headers) { size_t num_selects = headers.size(); Block common_header = headers.front(); size_t num_columns = common_header.columns(); for (size_t query_num = 1; query_num < num_selects; ++query_num) { if (headers[query_num].columns() != num_columns) throw Exception(ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH, "Different number of columns in IntersectExceptQuery elements:\n {} \nand\n {}", common_header.dumpNames(), headers[query_num].dumpNames()); } std::vector columns(num_selects); for (size_t column_num = 0; column_num < num_columns; ++column_num) { for (size_t i = 0; i < num_selects; ++i) columns[i] = &headers[i].getByPosition(column_num); ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num); result_elem = getLeastSuperColumn(columns); } return common_header; } InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery( const ASTPtr & query_ptr_, ContextPtr context_, const SelectQueryOptions & options_) : IInterpreterUnionOrSelectQuery(query_ptr_->clone(), context_, options_) { ASTSelectIntersectExceptQuery * ast = query_ptr->as(); final_operator = ast->final_operator; const auto & children = ast->children; size_t num_children = children.size(); /// AST must have been changed by the visitor. if (final_operator == Operator::UNKNOWN || num_children != 2) throw Exception(ErrorCodes::LOGICAL_ERROR, "SelectIntersectExceptyQuery has not been normalized (number of children: {})", num_children); nested_interpreters.resize(num_children); for (size_t i = 0; i < num_children; ++i) nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i)); Blocks headers(num_children); for (size_t query_num = 0; query_num < num_children; ++query_num) headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); result_header = getCommonHeader(headers); } std::unique_ptr InterpreterSelectIntersectExceptQuery::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_) { if (ast_ptr_->as()) return std::make_unique(ast_ptr_, context, SelectQueryOptions()); if (ast_ptr_->as()) return std::make_unique(ast_ptr_, context, SelectQueryOptions()); if (ast_ptr_->as()) return std::make_unique(ast_ptr_, context, SelectQueryOptions()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected query: {}", ast_ptr_->getID()); } void InterpreterSelectIntersectExceptQuery::buildQueryPlan(QueryPlan & query_plan) { size_t num_plans = nested_interpreters.size(); std::vector> plans(num_plans); DataStreams data_streams(num_plans); for (size_t i = 0; i < num_plans; ++i) { plans[i] = std::make_unique(); nested_interpreters[i]->buildQueryPlan(*plans[i]); if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header)) { auto actions_dag = ActionsDAG::makeConvertingActions( plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(), result_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Position); auto converting_step = std::make_unique(plans[i]->getCurrentDataStream(), std::move(actions_dag)); converting_step->setStepDescription("Conversion before UNION"); plans[i]->addStep(std::move(converting_step)); } data_streams[i] = plans[i]->getCurrentDataStream(); } auto max_threads = context->getSettingsRef().max_threads; auto step = std::make_unique(std::move(data_streams), final_operator, max_threads); query_plan.unitePlans(std::move(step), std::move(plans)); } BlockIO InterpreterSelectIntersectExceptQuery::execute() { BlockIO res; QueryPlan query_plan; buildQueryPlan(query_plan); auto pipeline = query_plan.buildQueryPipeline( QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); pipeline->addInterpreterContext(context); res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_plan.buildQueryPipeline( QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)))); return res; } void InterpreterSelectIntersectExceptQuery::ignoreWithTotals() { for (auto & interpreter : nested_interpreters) interpreter->ignoreWithTotals(); } }