ClickHouse/src/Interpreters/InterpreterSelectIntersectExceptQuery.cpp

169 lines
6.1 KiB
C++
Raw Normal View History

2021-04-16 20:18:39 +00:00
#include <Columns/getLeastSuperColumn.h>
#include <Interpreters/Context.h>
2021-08-12 11:42:51 +00:00
#include <Interpreters/InterpreterSelectIntersectExceptQuery.h>
2021-04-16 20:18:39 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
2021-08-12 11:42:51 +00:00
#include <Parsers/ASTSelectIntersectExceptQuery.h>
2021-04-16 20:18:39 +00:00
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
2021-08-08 17:12:12 +00:00
#include <Processors/QueryPlan/ExpressionStep.h>
2022-05-20 19:49:31 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2021-08-08 17:12:12 +00:00
2021-04-16 20:18:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
2021-08-13 12:07:44 +00:00
extern const int LOGICAL_ERROR;
2021-04-16 20:18:39 +00:00
}
2021-08-12 11:42:51 +00:00
static Block getCommonHeader(const Blocks & headers)
2021-04-16 20:18:39 +00:00
{
size_t num_selects = headers.size();
Block common_header = headers.front();
size_t num_columns = common_header.columns();
for (size_t query_num = 1; query_num < num_selects; ++query_num)
{
if (headers[query_num].columns() != num_columns)
2021-08-08 17:12:12 +00:00
throw Exception(ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH,
2021-08-12 11:42:51 +00:00
"Different number of columns in IntersectExceptQuery elements:\n {} \nand\n {}",
common_header.dumpNames(), headers[query_num].dumpNames());
2021-04-16 20:18:39 +00:00
}
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
for (size_t column_num = 0; column_num < num_columns; ++column_num)
{
for (size_t i = 0; i < num_selects; ++i)
columns[i] = &headers[i].getByPosition(column_num);
ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num);
result_elem = getLeastSuperColumn(columns);
}
return common_header;
}
2021-08-12 11:42:51 +00:00
InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions & options_)
:InterpreterSelectIntersectExceptQuery(query_ptr_, Context::createCopy(context_), options_)
{
}
InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions & options_)
2021-08-12 11:42:51 +00:00
: IInterpreterUnionOrSelectQuery(query_ptr_->clone(), context_, options_)
{
ASTSelectIntersectExceptQuery * ast = query_ptr->as<ASTSelectIntersectExceptQuery>();
final_operator = ast->final_operator;
2021-11-09 08:16:18 +00:00
const auto & children = ast->getListOfSelects();
2021-08-12 11:42:51 +00:00
size_t num_children = children.size();
/// AST must have been changed by the visitor.
if (final_operator == Operator::UNKNOWN || num_children != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"SelectIntersectExceptyQuery has not been normalized (number of children: {})",
num_children);
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
{
2021-08-12 11:42:51 +00:00
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
uses_view_source |= nested_interpreters[i]->usesViewSource();
}
2021-08-12 11:42:51 +00:00
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeader(headers);
}
2021-04-16 20:18:39 +00:00
std::unique_ptr<IInterpreterUnionOrSelectQuery>
2021-08-12 11:42:51 +00:00
InterpreterSelectIntersectExceptQuery::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_)
2021-04-16 20:18:39 +00:00
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, context, SelectQueryOptions());
2021-08-12 11:42:51 +00:00
if (ast_ptr_->as<ASTSelectQuery>())
2021-04-16 20:18:39 +00:00
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, context, SelectQueryOptions());
2021-08-12 11:42:51 +00:00
if (ast_ptr_->as<ASTSelectIntersectExceptQuery>())
return std::make_unique<InterpreterSelectIntersectExceptQuery>(ast_ptr_, context, SelectQueryOptions());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected query: {}", ast_ptr_->getID());
2021-04-16 20:18:39 +00:00
}
2021-08-12 11:42:51 +00:00
void InterpreterSelectIntersectExceptQuery::buildQueryPlan(QueryPlan & query_plan)
2021-04-16 20:18:39 +00:00
{
2022-05-31 14:43:38 +00:00
auto local_limits = getStorageLimits(*context, options);
storage_limits.emplace_back(local_limits);
for (auto & interpreter : nested_interpreters)
interpreter->addStorageLimits(storage_limits);
2021-04-16 20:18:39 +00:00
size_t num_plans = nested_interpreters.size();
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
2021-08-08 17:12:12 +00:00
if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(plans[i]->getCurrentDataStream(), std::move(actions_dag));
converting_step->setStepDescription("Conversion before UNION");
plans[i]->addStep(std::move(converting_step));
}
2021-04-16 20:18:39 +00:00
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
2021-08-12 11:42:51 +00:00
auto step = std::make_unique<IntersectOrExceptStep>(std::move(data_streams), final_operator, max_threads);
2021-04-16 20:18:39 +00:00
query_plan.unitePlans(std::move(step), std::move(plans));
2022-05-20 19:49:31 +00:00
query_plan.addInterpreterContext(context);
2021-04-16 20:18:39 +00:00
}
2021-08-12 11:42:51 +00:00
BlockIO InterpreterSelectIntersectExceptQuery::execute()
2021-04-16 20:18:39 +00:00
{
BlockIO res;
QueryPlan query_plan;
buildQueryPlan(query_plan);
2022-05-23 13:46:57 +00:00
auto builder = query_plan.buildQueryPipeline(
2021-04-16 20:18:39 +00:00
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
2022-05-24 20:06:08 +00:00
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
2021-09-15 19:35:48 +00:00
2022-06-02 10:34:40 +00:00
setQuota(res.pipeline);
2021-04-16 20:18:39 +00:00
return res;
}
2021-08-08 17:12:12 +00:00
2021-08-15 06:55:43 +00:00
void InterpreterSelectIntersectExceptQuery::ignoreWithTotals()
{
for (auto & interpreter : nested_interpreters)
interpreter->ignoreWithTotals();
}
2021-04-16 20:18:39 +00:00
}