mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Add INTERSECT and EXCEPT
This commit is contained in:
commit
42f6314c9a
@ -553,6 +553,7 @@
|
||||
M(583, ILLEGAL_PROJECTION) \
|
||||
M(584, PROJECTION_NOT_USED) \
|
||||
M(585, CANNOT_PARSE_YAML) \
|
||||
M(586, INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH) \
|
||||
\
|
||||
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
|
@ -1,14 +1,17 @@
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/ASTCheckQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTCreateUserQuery.h>
|
||||
#include <Parsers/ASTCreateRoleQuery.h>
|
||||
#include <Parsers/ASTCreateQuotaQuery.h>
|
||||
#include <Parsers/ASTCreateRoleQuery.h>
|
||||
#include <Parsers/ASTCreateRowPolicyQuery.h>
|
||||
#include <Parsers/ASTCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ASTCreateUserQuery.h>
|
||||
#include <Parsers/ASTDropAccessEntityQuery.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
#include <Parsers/ASTExplainQuery.h>
|
||||
#include <Parsers/ASTGrantQuery.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTIntersectOrExcept.h>
|
||||
#include <Parsers/ASTKillQueryQuery.h>
|
||||
#include <Parsers/ASTOptimizeQuery.h>
|
||||
#include <Parsers/ASTRenameQuery.h>
|
||||
@ -24,11 +27,9 @@
|
||||
#include <Parsers/ASTShowProcesslistQuery.h>
|
||||
#include <Parsers/ASTShowTablesQuery.h>
|
||||
#include <Parsers/ASTUseQuery.h>
|
||||
#include <Parsers/ASTExplainQuery.h>
|
||||
#include <Parsers/TablePropertiesQueriesASTs.h>
|
||||
#include <Parsers/ASTWatchQuery.h>
|
||||
#include <Parsers/ASTGrantQuery.h>
|
||||
#include <Parsers/MySQL/ASTCreateQuery.h>
|
||||
#include <Parsers/TablePropertiesQueriesASTs.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterAlterQuery.h>
|
||||
@ -44,9 +45,11 @@
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Interpreters/InterpreterExistsQuery.h>
|
||||
#include <Interpreters/InterpreterExplainQuery.h>
|
||||
#include <Interpreters/InterpreterExternalDDLQuery.h>
|
||||
#include <Interpreters/InterpreterFactory.h>
|
||||
#include <Interpreters/InterpreterGrantQuery.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/InterpreterIntersectOrExcept.h>
|
||||
#include <Interpreters/InterpreterKillQueryQuery.h>
|
||||
#include <Interpreters/InterpreterOptimizeQuery.h>
|
||||
#include <Interpreters/InterpreterRenameQuery.h>
|
||||
@ -65,7 +68,6 @@
|
||||
#include <Interpreters/InterpreterSystemQuery.h>
|
||||
#include <Interpreters/InterpreterUseQuery.h>
|
||||
#include <Interpreters/InterpreterWatchQuery.h>
|
||||
#include <Interpreters/InterpreterExternalDDLQuery.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
|
||||
#include <Parsers/ASTSystemQuery.h>
|
||||
@ -109,6 +111,10 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextPtr
|
||||
ProfileEvents::increment(ProfileEvents::SelectQuery);
|
||||
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, options);
|
||||
}
|
||||
else if (query->as<ASTIntersectOrExcept>())
|
||||
{
|
||||
return std::make_unique<InterpreterIntersectOrExcept>(query, context);
|
||||
}
|
||||
else if (query->as<ASTInsertQuery>())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::InsertQuery);
|
||||
|
116
src/Interpreters/InterpreterIntersectOrExcept.cpp
Normal file
116
src/Interpreters/InterpreterIntersectOrExcept.cpp
Normal file
@ -0,0 +1,116 @@
|
||||
#include <Columns/getLeastSuperColumn.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterIntersectOrExcept.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Parsers/ASTIntersectOrExcept.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Processors/QueryPlan/IQueryPlanStep.h>
|
||||
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
|
||||
}
|
||||
|
||||
InterpreterIntersectOrExcept::InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_)
|
||||
: query_ptr(query_ptr_), context(Context::createCopy(context_))
|
||||
{
|
||||
ASTIntersectOrExcept * ast = query_ptr->as<ASTIntersectOrExcept>();
|
||||
size_t num_children = ast->children.size();
|
||||
nested_interpreters.resize(num_children);
|
||||
for (size_t i = 0; i < num_children; ++i)
|
||||
{
|
||||
nested_interpreters[i] = buildCurrentChildInterpreter(ast->children[i]);
|
||||
}
|
||||
|
||||
Blocks headers(num_children);
|
||||
for (size_t query_num = 0; query_num < num_children; ++query_num)
|
||||
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
|
||||
|
||||
result_header = getCommonHeader(headers);
|
||||
}
|
||||
|
||||
|
||||
Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers)
|
||||
{
|
||||
size_t num_selects = headers.size();
|
||||
Block common_header = headers.front();
|
||||
size_t num_columns = common_header.columns();
|
||||
|
||||
for (size_t query_num = 1; query_num < num_selects; ++query_num)
|
||||
{
|
||||
if (headers[query_num].columns() != num_columns)
|
||||
throw Exception(
|
||||
"Different number of columns in "
|
||||
+ toString(query_ptr->as<ASTIntersectOrExcept>()->is_except ? "EXCEPT" : "INTERSECT")
|
||||
+ " elements:\n" + common_header.dumpNames() + "\nand\n"
|
||||
+ headers[query_num].dumpNames() + "\n",
|
||||
ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH);
|
||||
}
|
||||
|
||||
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
|
||||
|
||||
for (size_t column_num = 0; column_num < num_columns; ++column_num)
|
||||
{
|
||||
for (size_t i = 0; i < num_selects; ++i)
|
||||
columns[i] = &headers[i].getByPosition(column_num);
|
||||
|
||||
ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num);
|
||||
result_elem = getLeastSuperColumn(columns);
|
||||
}
|
||||
|
||||
return common_header;
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<IInterpreterUnionOrSelectQuery>
|
||||
InterpreterIntersectOrExcept::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_)
|
||||
{
|
||||
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
|
||||
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, context, SelectQueryOptions());
|
||||
else
|
||||
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, context, SelectQueryOptions());
|
||||
}
|
||||
|
||||
void InterpreterIntersectOrExcept::buildQueryPlan(QueryPlan & query_plan)
|
||||
{
|
||||
size_t num_plans = nested_interpreters.size();
|
||||
|
||||
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
|
||||
DataStreams data_streams(num_plans);
|
||||
|
||||
for (size_t i = 0; i < num_plans; ++i)
|
||||
{
|
||||
plans[i] = std::make_unique<QueryPlan>();
|
||||
nested_interpreters[i]->buildQueryPlan(*plans[i]);
|
||||
data_streams[i] = plans[i]->getCurrentDataStream();
|
||||
}
|
||||
|
||||
auto max_threads = context->getSettingsRef().max_threads;
|
||||
auto step = std::make_unique<IntersectOrExceptStep>(
|
||||
query_ptr->as<ASTIntersectOrExcept>()->is_except, std::move(data_streams), result_header, max_threads);
|
||||
query_plan.unitePlans(std::move(step), std::move(plans));
|
||||
}
|
||||
|
||||
BlockIO InterpreterIntersectOrExcept::execute()
|
||||
{
|
||||
BlockIO res;
|
||||
|
||||
QueryPlan query_plan;
|
||||
buildQueryPlan(query_plan);
|
||||
|
||||
auto pipeline = query_plan.buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context),
|
||||
BuildQueryPipelineSettings::fromContext(context));
|
||||
|
||||
res.pipeline = std::move(*pipeline);
|
||||
res.pipeline.addInterpreterContext(context);
|
||||
|
||||
return res;
|
||||
}
|
||||
}
|
35
src/Interpreters/InterpreterIntersectOrExcept.h
Normal file
35
src/Interpreters/InterpreterIntersectOrExcept.h
Normal file
@ -0,0 +1,35 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
#include <Interpreters/IInterpreter.h>
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class InterpreterSelectQuery;
|
||||
class QueryPlan;
|
||||
|
||||
class InterpreterIntersectOrExcept : public IInterpreter
|
||||
{
|
||||
public:
|
||||
InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_);
|
||||
|
||||
/// Builds QueryPlan for current query.
|
||||
virtual void buildQueryPlan(QueryPlan & query_plan);
|
||||
|
||||
BlockIO execute() override;
|
||||
|
||||
private:
|
||||
ASTPtr query_ptr;
|
||||
ContextPtr context;
|
||||
Block result_header;
|
||||
std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
|
||||
Block getCommonHeader(const Blocks & headers);
|
||||
|
||||
std::unique_ptr<IInterpreterUnionOrSelectQuery>
|
||||
buildCurrentChildInterpreter(const ASTPtr & ast_ptr_);
|
||||
};
|
||||
|
||||
}
|
28
src/Parsers/ASTIntersectOrExcept.cpp
Normal file
28
src/Parsers/ASTIntersectOrExcept.cpp
Normal file
@ -0,0 +1,28 @@
|
||||
#include <Parsers/ASTIntersectOrExcept.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ASTPtr ASTIntersectOrExcept::clone() const
|
||||
{
|
||||
auto res = std::make_shared<ASTIntersectOrExcept>(*this);
|
||||
res->children.clear();
|
||||
res->children.push_back(children[0]->clone());
|
||||
res->children.push_back(children[1]->clone());
|
||||
res->is_except = is_except;
|
||||
cloneOutputOptions(*res);
|
||||
return res;
|
||||
}
|
||||
|
||||
void ASTIntersectOrExcept::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
|
||||
{
|
||||
children[0]->formatImpl(settings, state, frame);
|
||||
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
|
||||
settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
|
||||
<< (is_except ? "EXCEPT" : "INTERSECT ")
|
||||
<< (settings.hilite ? hilite_none : "") << settings.nl_or_ws;
|
||||
children[1]->formatImpl(settings, state, frame);
|
||||
}
|
||||
|
||||
}
|
18
src/Parsers/ASTIntersectOrExcept.h
Normal file
18
src/Parsers/ASTIntersectOrExcept.h
Normal file
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/ASTQueryWithOutput.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ASTIntersectOrExcept : public ASTQueryWithOutput
|
||||
{
|
||||
public:
|
||||
String getID(char) const override { return is_except ? "Except" : "Intersect"; }
|
||||
ASTPtr clone() const override;
|
||||
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
|
||||
bool is_except;
|
||||
};
|
||||
|
||||
}
|
50
src/Parsers/ParserIntersectOrExcept.cpp
Normal file
50
src/Parsers/ParserIntersectOrExcept.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
#include <Parsers/ASTIntersectOrExcept.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/CommonParsers.h>
|
||||
#include <Parsers/ExpressionElementParsers.h>
|
||||
#include <Parsers/ParserIntersectOrExcept.h>
|
||||
#include <Parsers/ParserSelectQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
bool ParserIntersectOrExcept::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
ParserKeyword intersect_keyword("INTERSECT");
|
||||
ParserKeyword except_keyword("EXCEPT");
|
||||
ASTPtr left_node;
|
||||
ASTPtr right_node;
|
||||
|
||||
auto ast = std::make_shared<ASTIntersectOrExcept>();
|
||||
ast->is_except = false;
|
||||
|
||||
if (!ParserSelectQuery().parse(pos, left_node, expected) && !ParserSubquery().parse(pos, left_node, expected))
|
||||
return false;
|
||||
|
||||
if (!intersect_keyword.ignore(pos))
|
||||
{
|
||||
if (!except_keyword.ignore(pos))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
ast->is_except = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ParserSelectQuery().parse(pos, right_node, expected) && !ParserSubquery().parse(pos, right_node, expected))
|
||||
return false;
|
||||
|
||||
if (const auto * ast_subquery = left_node->as<ASTSubquery>())
|
||||
left_node = ast_subquery->children.at(0);
|
||||
if (const auto * ast_subquery = right_node->as<ASTSubquery>())
|
||||
right_node = ast_subquery->children.at(0);
|
||||
|
||||
ast->children.push_back(left_node);
|
||||
ast->children.push_back(right_node);
|
||||
|
||||
node = ast;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
14
src/Parsers/ParserIntersectOrExcept.h
Normal file
14
src/Parsers/ParserIntersectOrExcept.h
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
#include <Parsers/IParserBase.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ParserIntersectOrExcept : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "INTERSECT or EXCEPT"; }
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
}
|
@ -1,36 +1,37 @@
|
||||
#include <Parsers/ParserQueryWithOutput.h>
|
||||
#include <Parsers/ParserShowTablesQuery.h>
|
||||
#include <Parsers/ParserSelectWithUnionQuery.h>
|
||||
#include <Parsers/ParserTablePropertiesQuery.h>
|
||||
#include <Parsers/ParserDescribeTableQuery.h>
|
||||
#include <Parsers/ParserShowProcesslistQuery.h>
|
||||
#include <Parsers/ParserCheckQuery.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/ParserRenameQuery.h>
|
||||
#include <Parsers/ParserAlterQuery.h>
|
||||
#include <Parsers/ParserDropQuery.h>
|
||||
#include <Parsers/ParserKillQueryQuery.h>
|
||||
#include <Parsers/ParserOptimizeQuery.h>
|
||||
#include <Parsers/ParserWatchQuery.h>
|
||||
#include <Parsers/ParserSetQuery.h>
|
||||
#include <Parsers/ASTExplainQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ParserAlterQuery.h>
|
||||
#include <Parsers/ParserCheckQuery.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/ParserDescribeTableQuery.h>
|
||||
#include <Parsers/ParserDropQuery.h>
|
||||
#include <Parsers/ParserExplainQuery.h>
|
||||
#include <Parsers/ParserIntersectOrExcept.h>
|
||||
#include <Parsers/ParserKillQueryQuery.h>
|
||||
#include <Parsers/ParserOptimizeQuery.h>
|
||||
#include <Parsers/ParserQueryWithOutput.h>
|
||||
#include <Parsers/ParserRenameQuery.h>
|
||||
#include <Parsers/ParserSelectWithUnionQuery.h>
|
||||
#include <Parsers/ParserSetQuery.h>
|
||||
#include <Parsers/ParserShowAccessEntitiesQuery.h>
|
||||
#include <Parsers/ParserShowAccessQuery.h>
|
||||
#include <Parsers/ParserShowCreateAccessEntityQuery.h>
|
||||
#include <Parsers/ParserShowGrantsQuery.h>
|
||||
#include <Parsers/ParserShowPrivilegesQuery.h>
|
||||
#include <Parsers/ParserExplainQuery.h>
|
||||
#include <Parsers/ParserShowProcesslistQuery.h>
|
||||
#include <Parsers/ParserShowTablesQuery.h>
|
||||
#include <Parsers/ParserTablePropertiesQuery.h>
|
||||
#include <Parsers/ParserWatchQuery.h>
|
||||
#include <Parsers/QueryWithOutputSettingsPushDownVisitor.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
ParserShowTablesQuery show_tables_p;
|
||||
ParserIntersectOrExcept intersect_p;
|
||||
ParserSelectWithUnionQuery select_p;
|
||||
ParserTablePropertiesQuery table_p;
|
||||
ParserDescribeTableQuery describe_table_p;
|
||||
@ -54,6 +55,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
|
||||
bool parsed =
|
||||
explain_p.parse(pos, query, expected)
|
||||
|| intersect_p.parse(pos, query, expected)
|
||||
|| select_p.parse(pos, query, expected)
|
||||
|| show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`
|
||||
|| show_tables_p.parse(pos, query, expected)
|
||||
|
38
src/Processors/QueryPlan/IntersectOrExceptStep.cpp
Normal file
38
src/Processors/QueryPlan/IntersectOrExceptStep.cpp
Normal file
@ -0,0 +1,38 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
|
||||
#include <Processors/Sources/NullSource.h>
|
||||
#include <Processors/Transforms/IntersectOrExceptTransform.h>
|
||||
#include <Processors/ResizeProcessor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
IntersectOrExceptStep::IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_)
|
||||
: is_except(is_except_), header(std::move(result_header)), max_threads(max_threads_)
|
||||
{
|
||||
input_streams = std::move(input_streams_);
|
||||
output_stream = DataStream{.header = header};
|
||||
}
|
||||
|
||||
QueryPipelinePtr IntersectOrExceptStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & )
|
||||
{
|
||||
auto pipeline = std::make_unique<QueryPipeline>();
|
||||
QueryPipelineProcessorsCollector collector(*pipeline, this);
|
||||
|
||||
pipelines[0]->addTransform(std::make_shared<ResizeProcessor>(header, pipelines[0]->getNumStreams(), 1));
|
||||
pipelines[1]->addTransform(std::make_shared<ResizeProcessor>(header, pipelines[1]->getNumStreams(), 1));
|
||||
|
||||
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
|
||||
pipeline->addTransform(std::make_shared<IntersectOrExceptTransform>(is_except, header));
|
||||
|
||||
processors = collector.detachProcessors();
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const
|
||||
{
|
||||
IQueryPlanStep::describePipeline(processors, settings);
|
||||
}
|
||||
|
||||
}
|
26
src/Processors/QueryPlan/IntersectOrExceptStep.h
Normal file
26
src/Processors/QueryPlan/IntersectOrExceptStep.h
Normal file
@ -0,0 +1,26 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/IQueryPlanStep.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IntersectOrExceptStep : public IQueryPlanStep
|
||||
{
|
||||
public:
|
||||
/// max_threads is used to limit the number of threads for result pipeline.
|
||||
IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_ = 0);
|
||||
|
||||
String getName() const override { return is_except ? "Except" : "Intersect"; }
|
||||
|
||||
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
|
||||
|
||||
void describePipeline(FormatSettings & settings) const override;
|
||||
private:
|
||||
bool is_except;
|
||||
Block header;
|
||||
size_t max_threads;
|
||||
Processors processors;
|
||||
};
|
||||
|
||||
}
|
||||
|
192
src/Processors/Transforms/IntersectOrExceptTransform.cpp
Normal file
192
src/Processors/Transforms/IntersectOrExceptTransform.cpp
Normal file
@ -0,0 +1,192 @@
|
||||
#include <Processors/Transforms/IntersectOrExceptTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
IntersectOrExceptTransform::IntersectOrExceptTransform(bool is_except_, const Block & header_)
|
||||
: IProcessor(InputPorts(2, header_), {header_}), is_except(is_except_), output(outputs.front())
|
||||
{
|
||||
const Names & columns = header_.getNames();
|
||||
size_t num_columns = columns.empty() ? header_.columns() : columns.size();
|
||||
|
||||
key_columns_pos.reserve(columns.size());
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
auto pos = columns.empty() ? i : header_.getPositionByName(columns[i]);
|
||||
|
||||
const auto & col = header_.getByPosition(pos).column;
|
||||
|
||||
if (!(col && isColumnConst(*col)))
|
||||
key_columns_pos.emplace_back(pos);
|
||||
}
|
||||
}
|
||||
|
||||
IntersectOrExceptTransform::Status IntersectOrExceptTransform::prepare()
|
||||
{
|
||||
/// Check can output.
|
||||
if (output.isFinished())
|
||||
{
|
||||
for (auto & in : inputs)
|
||||
in.close();
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
if (!output.canPush())
|
||||
{
|
||||
if (inputs.front().isFinished())
|
||||
{
|
||||
inputs.back().setNotNeeded();
|
||||
}
|
||||
else
|
||||
{
|
||||
inputs.front().setNotNeeded();
|
||||
}
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
/// Output if has data.
|
||||
if (current_output_chunk)
|
||||
{
|
||||
output.push(std::move(current_output_chunk));
|
||||
}
|
||||
|
||||
if (push_empty_chunk)
|
||||
{
|
||||
output.push(std::move(empty_chunk));
|
||||
push_empty_chunk = false;
|
||||
}
|
||||
|
||||
if (finished_second_input)
|
||||
{
|
||||
if (inputs.front().isFinished())
|
||||
{
|
||||
output.finish();
|
||||
return Status::Finished;
|
||||
}
|
||||
}
|
||||
else if (inputs.back().isFinished())
|
||||
{
|
||||
finished_second_input = true;
|
||||
}
|
||||
|
||||
InputPort & input = finished_second_input ? inputs.front() : inputs.back();
|
||||
|
||||
/// Check can input.
|
||||
if (!has_input)
|
||||
{
|
||||
input.setNeeded();
|
||||
if (!input.hasData())
|
||||
{
|
||||
return Status::NeedData;
|
||||
}
|
||||
|
||||
current_input_chunk = input.pull();
|
||||
has_input = true;
|
||||
}
|
||||
|
||||
return Status::Ready;
|
||||
}
|
||||
|
||||
void IntersectOrExceptTransform::work()
|
||||
{
|
||||
if (!finished_second_input)
|
||||
{
|
||||
accumulate(std::move(current_input_chunk));
|
||||
}
|
||||
else
|
||||
{
|
||||
filter(current_input_chunk);
|
||||
current_output_chunk = std::move(current_input_chunk);
|
||||
}
|
||||
|
||||
has_input = false;
|
||||
}
|
||||
|
||||
template <typename Method>
|
||||
void IntersectOrExceptTransform::addToSet(Method & method, const ColumnRawPtrs & columns, size_t rows, SetVariants & variants) const
|
||||
{
|
||||
typename Method::State state(columns, key_sizes, nullptr);
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
state.emplaceKey(method.data, i, variants.string_pool);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Method>
|
||||
size_t IntersectOrExceptTransform::buildFilter(
|
||||
Method & method, const ColumnRawPtrs & columns, IColumn::Filter & filter, size_t rows, SetVariants & variants) const
|
||||
{
|
||||
typename Method::State state(columns, key_sizes, nullptr);
|
||||
size_t new_rows_num = 0;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
auto find_result = state.findKey(method.data, i, variants.string_pool);
|
||||
filter[i] = is_except ? !find_result.isFound() : find_result.isFound();
|
||||
if (filter[i])
|
||||
++new_rows_num;
|
||||
}
|
||||
return new_rows_num;
|
||||
}
|
||||
|
||||
void IntersectOrExceptTransform::accumulate(Chunk chunk)
|
||||
{
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
ColumnRawPtrs column_ptrs;
|
||||
column_ptrs.reserve(key_columns_pos.size());
|
||||
for (auto pos : key_columns_pos)
|
||||
column_ptrs.emplace_back(columns[pos].get());
|
||||
|
||||
if (data.empty())
|
||||
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
|
||||
|
||||
switch (data.type)
|
||||
{
|
||||
case SetVariants::Type::EMPTY:
|
||||
break;
|
||||
#define M(NAME) \
|
||||
case SetVariants::Type::NAME: \
|
||||
addToSet(*data.NAME, column_ptrs, num_rows, data); \
|
||||
break;
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
}
|
||||
|
||||
void IntersectOrExceptTransform::filter(Chunk & chunk)
|
||||
{
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
ColumnRawPtrs column_ptrs;
|
||||
column_ptrs.reserve(key_columns_pos.size());
|
||||
for (auto pos : key_columns_pos)
|
||||
column_ptrs.emplace_back(columns[pos].get());
|
||||
|
||||
if (data.empty())
|
||||
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
|
||||
|
||||
IColumn::Filter filter(num_rows);
|
||||
|
||||
size_t new_rows_num = 0;
|
||||
switch (data.type)
|
||||
{
|
||||
case SetVariants::Type::EMPTY:
|
||||
break;
|
||||
#define M(NAME) \
|
||||
case SetVariants::Type::NAME: \
|
||||
new_rows_num = buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \
|
||||
break;
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
for (auto & column : columns)
|
||||
column = column->filter(filter, -1);
|
||||
|
||||
chunk.setColumns(std::move(columns), new_rows_num);
|
||||
}
|
||||
|
||||
}
|
53
src/Processors/Transforms/IntersectOrExceptTransform.h
Normal file
53
src/Processors/Transforms/IntersectOrExceptTransform.h
Normal file
@ -0,0 +1,53 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Interpreters/SetVariants.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IntersectOrExceptTransform : public IProcessor
|
||||
{
|
||||
public:
|
||||
IntersectOrExceptTransform(bool is_except_, const Block & header_);
|
||||
|
||||
Status prepare() override;
|
||||
void work() override;
|
||||
|
||||
String getName() const override { return is_except ? "Except" : "Intersect"; }
|
||||
|
||||
private:
|
||||
|
||||
bool push_empty_chunk = false;
|
||||
Chunk empty_chunk;
|
||||
|
||||
bool is_except;
|
||||
ColumnNumbers key_columns_pos;
|
||||
SetVariants data;
|
||||
Sizes key_sizes;
|
||||
Chunk current_input_chunk;
|
||||
Chunk current_output_chunk;
|
||||
bool finished_second_input = false;
|
||||
bool has_input = false;
|
||||
OutputPort & output;
|
||||
|
||||
void accumulate(Chunk chunk);
|
||||
void filter(Chunk & chunk);
|
||||
template <typename Method>
|
||||
void addToSet(
|
||||
Method & method,
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t rows,
|
||||
SetVariants & variants) const;
|
||||
|
||||
template <typename Method>
|
||||
size_t buildFilter(
|
||||
Method & method,
|
||||
const ColumnRawPtrs & columns,
|
||||
IColumn::Filter & filter,
|
||||
size_t rows,
|
||||
SetVariants & variants) const;
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user