Add INTERSECT and EXCEPT

This commit is contained in:
Kirill Ershov 2021-04-16 23:18:39 +03:00
parent ccb7e6e03f
commit 3f9e9a7025
No known key found for this signature in database
GPG Key ID: 7308017075B4D73F
13 changed files with 606 additions and 24 deletions

View File

@ -533,7 +533,11 @@
M(564, INTERSERVER_SCHEME_DOESNT_MATCH) \
M(565, TOO_MANY_PARTITIONS) \
M(566, CANNOT_RMDIR) \
\
M(567, DUPLICATED_PART_UUIDS) \
M(568, RAFT_ERROR) \
M(569, MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD) \
M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \
M(571, INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -1,14 +1,17 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateUserQuery.h>
#include <Parsers/ASTCreateRoleQuery.h>
#include <Parsers/ASTCreateQuotaQuery.h>
#include <Parsers/ASTCreateRoleQuery.h>
#include <Parsers/ASTCreateRowPolicyQuery.h>
#include <Parsers/ASTCreateSettingsProfileQuery.h>
#include <Parsers/ASTCreateUserQuery.h>
#include <Parsers/ASTDropAccessEntityQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTRenameQuery.h>
@ -24,11 +27,9 @@
#include <Parsers/ASTShowProcesslistQuery.h>
#include <Parsers/ASTShowTablesQuery.h>
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterAlterQuery.h>
@ -44,9 +45,11 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
#include <Interpreters/InterpreterExternalDDLQuery.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterIntersectOrExcept.h>
#include <Interpreters/InterpreterKillQueryQuery.h>
#include <Interpreters/InterpreterOptimizeQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
@ -65,7 +68,6 @@
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterExternalDDLQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ASTSystemQuery.h>
@ -109,6 +111,10 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
ProfileEvents::increment(ProfileEvents::SelectQuery);
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, options);
}
else if (query->as<ASTIntersectOrExcept>())
{
return std::make_unique<InterpreterIntersectOrExcept>(query, context);
}
else if (query->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQuery);

View File

@ -0,0 +1,116 @@
#include <Columns/getLeastSuperColumn.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterIntersectOrExcept.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
}
InterpreterIntersectOrExcept::InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_)
: query_ptr(query_ptr_), context(Context::createCopy(context_))
{
ASTIntersectOrExcept * ast = query_ptr->as<ASTIntersectOrExcept>();
size_t num_children = ast->children.size();
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
{
nested_interpreters[i] = buildCurrentChildInterpreter(ast->children[i]);
}
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeader(headers);
}
Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers)
{
size_t num_selects = headers.size();
Block common_header = headers.front();
size_t num_columns = common_header.columns();
for (size_t query_num = 1; query_num < num_selects; ++query_num)
{
if (headers[query_num].columns() != num_columns)
throw Exception(
"Different number of columns in "
+ toString(query_ptr->as<ASTIntersectOrExcept>()->is_except ? "EXCEPT" : "INTERSECT")
+ " elements:\n" + common_header.dumpNames() + "\nand\n"
+ headers[query_num].dumpNames() + "\n",
ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH);
}
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
for (size_t column_num = 0; column_num < num_columns; ++column_num)
{
for (size_t i = 0; i < num_selects; ++i)
columns[i] = &headers[i].getByPosition(column_num);
ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num);
result_elem = getLeastSuperColumn(columns);
}
return common_header;
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>
InterpreterIntersectOrExcept::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_)
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, context, SelectQueryOptions());
else
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, context, SelectQueryOptions());
}
void InterpreterIntersectOrExcept::buildQueryPlan(QueryPlan & query_plan)
{
size_t num_plans = nested_interpreters.size();
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto step = std::make_unique<IntersectOrExceptStep>(
query_ptr->as<ASTIntersectOrExcept>()->is_except, std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(step), std::move(plans));
}
BlockIO InterpreterIntersectOrExcept::execute()
{
BlockIO res;
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline = query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);
return res;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;
class QueryPlan;
class InterpreterIntersectOrExcept : public IInterpreter
{
public:
InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_);
/// Builds QueryPlan for current query.
virtual void buildQueryPlan(QueryPlan & query_plan);
BlockIO execute() override;
private:
ASTPtr query_ptr;
ContextPtr context;
Block result_header;
std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
Block getCommonHeader(const Blocks & headers);
std::unique_ptr<IInterpreterUnionOrSelectQuery>
buildCurrentChildInterpreter(const ASTPtr & ast_ptr_);
};
}

View File

@ -0,0 +1,28 @@
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSubquery.h>
namespace DB
{
ASTPtr ASTIntersectOrExcept::clone() const
{
auto res = std::make_shared<ASTIntersectOrExcept>(*this);
res->children.clear();
res->children.push_back(children[0]->clone());
res->children.push_back(children[1]->clone());
res->is_except = is_except;
cloneOutputOptions(*res);
return res;
}
void ASTIntersectOrExcept::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
children[0]->formatImpl(settings, state, frame);
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< (is_except ? "EXCEPT" : "INTERSECT ")
<< (settings.hilite ? hilite_none : "") << settings.nl_or_ws;
children[1]->formatImpl(settings, state, frame);
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Parsers/ASTQueryWithOutput.h>
namespace DB
{
class ASTIntersectOrExcept : public ASTQueryWithOutput
{
public:
String getID(char) const override { return is_except ? "Except" : "Intersect"; }
ASTPtr clone() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
bool is_except;
};
}

View File

@ -0,0 +1,50 @@
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserIntersectOrExcept.h>
#include <Parsers/ParserSelectQuery.h>
namespace DB
{
bool ParserIntersectOrExcept::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword intersect_keyword("INTERSECT");
ParserKeyword except_keyword("EXCEPT");
ASTPtr left_node;
ASTPtr right_node;
auto ast = std::make_shared<ASTIntersectOrExcept>();
ast->is_except = false;
if (!ParserSelectQuery().parse(pos, left_node, expected) && !ParserSubquery().parse(pos, left_node, expected))
return false;
if (!intersect_keyword.ignore(pos))
{
if (!except_keyword.ignore(pos))
{
return false;
}
else
{
ast->is_except = true;
}
}
if (!ParserSelectQuery().parse(pos, right_node, expected) && !ParserSubquery().parse(pos, right_node, expected))
return false;
if (const auto * ast_subquery = left_node->as<ASTSubquery>())
left_node = ast_subquery->children.at(0);
if (const auto * ast_subquery = right_node->as<ASTSubquery>())
right_node = ast_subquery->children.at(0);
ast->children.push_back(left_node);
ast->children.push_back(right_node);
node = ast;
return true;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
class ParserIntersectOrExcept : public IParserBase
{
protected:
const char * getName() const override { return "INTERSECT or EXCEPT"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -1,36 +1,37 @@
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/ParserShowTablesQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserTablePropertiesQuery.h>
#include <Parsers/ParserDescribeTableQuery.h>
#include <Parsers/ParserShowProcesslistQuery.h>
#include <Parsers/ParserCheckQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserRenameQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserCheckQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserDescribeTableQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/ParserIntersectOrExcept.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/ParserRenameQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ParserShowAccessEntitiesQuery.h>
#include <Parsers/ParserShowAccessQuery.h>
#include <Parsers/ParserShowCreateAccessEntityQuery.h>
#include <Parsers/ParserShowGrantsQuery.h>
#include <Parsers/ParserShowPrivilegesQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/ParserShowProcesslistQuery.h>
#include <Parsers/ParserShowTablesQuery.h>
#include <Parsers/ParserTablePropertiesQuery.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/QueryWithOutputSettingsPushDownVisitor.h>
namespace DB
{
bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserShowTablesQuery show_tables_p;
ParserIntersectOrExcept intersect_p;
ParserSelectWithUnionQuery select_p;
ParserTablePropertiesQuery table_p;
ParserDescribeTableQuery describe_table_p;
@ -54,6 +55,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool parsed =
explain_p.parse(pos, query, expected)
|| intersect_p.parse(pos, query, expected)
|| select_p.parse(pos, query, expected)
|| show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`
|| show_tables_p.parse(pos, query, expected)

View File

@ -0,0 +1,38 @@
#include <Interpreters/Context.h>
#include <Processors/QueryPipeline.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/IntersectOrExceptTransform.h>
#include <Processors/ResizeProcessor.h>
namespace DB
{
IntersectOrExceptStep::IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_)
: is_except(is_except_), header(std::move(result_header)), max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
output_stream = DataStream{.header = header};
}
QueryPipelinePtr IntersectOrExceptStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & )
{
auto pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
pipelines[0]->addTransform(std::make_shared<ResizeProcessor>(header, pipelines[0]->getNumStreams(), 1));
pipelines[1]->addTransform(std::make_shared<ResizeProcessor>(header, pipelines[1]->getNumStreams(), 1));
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
pipeline->addTransform(std::make_shared<IntersectOrExceptTransform>(is_except, header));
processors = collector.detachProcessors();
return pipeline;
}
void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class IntersectOrExceptStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_ = 0);
String getName() const override { return is_except ? "Except" : "Intersect"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override;
void describePipeline(FormatSettings & settings) const override;
private:
bool is_except;
Block header;
size_t max_threads;
Processors processors;
};
}

View File

@ -0,0 +1,192 @@
#include <Processors/Transforms/IntersectOrExceptTransform.h>
namespace DB
{
IntersectOrExceptTransform::IntersectOrExceptTransform(bool is_except_, const Block & header_)
: IProcessor(InputPorts(2, header_), {header_}), is_except(is_except_), output(outputs.front())
{
const Names & columns = header_.getNames();
size_t num_columns = columns.empty() ? header_.columns() : columns.size();
key_columns_pos.reserve(columns.size());
for (size_t i = 0; i < num_columns; ++i)
{
auto pos = columns.empty() ? i : header_.getPositionByName(columns[i]);
const auto & col = header_.getByPosition(pos).column;
if (!(col && isColumnConst(*col)))
key_columns_pos.emplace_back(pos);
}
}
IntersectOrExceptTransform::Status IntersectOrExceptTransform::prepare()
{
/// Check can output.
if (output.isFinished())
{
for (auto & in : inputs)
in.close();
return Status::Finished;
}
if (!output.canPush())
{
if (inputs.front().isFinished())
{
inputs.back().setNotNeeded();
}
else
{
inputs.front().setNotNeeded();
}
return Status::PortFull;
}
/// Output if has data.
if (current_output_chunk)
{
output.push(std::move(current_output_chunk));
}
if (push_empty_chunk)
{
output.push(std::move(empty_chunk));
push_empty_chunk = false;
}
if (finished_second_input)
{
if (inputs.front().isFinished())
{
output.finish();
return Status::Finished;
}
}
else if (inputs.back().isFinished())
{
finished_second_input = true;
}
InputPort & input = finished_second_input ? inputs.front() : inputs.back();
/// Check can input.
if (!has_input)
{
input.setNeeded();
if (!input.hasData())
{
return Status::NeedData;
}
current_input_chunk = input.pull();
has_input = true;
}
return Status::Ready;
}
void IntersectOrExceptTransform::work()
{
if (!finished_second_input)
{
accumulate(std::move(current_input_chunk));
}
else
{
filter(current_input_chunk);
current_output_chunk = std::move(current_input_chunk);
}
has_input = false;
}
template <typename Method>
void IntersectOrExceptTransform::addToSet(Method & method, const ColumnRawPtrs & columns, size_t rows, SetVariants & variants) const
{
typename Method::State state(columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
state.emplaceKey(method.data, i, variants.string_pool);
}
}
template <typename Method>
size_t IntersectOrExceptTransform::buildFilter(
Method & method, const ColumnRawPtrs & columns, IColumn::Filter & filter, size_t rows, SetVariants & variants) const
{
typename Method::State state(columns, key_sizes, nullptr);
size_t new_rows_num = 0;
for (size_t i = 0; i < rows; ++i)
{
auto find_result = state.findKey(method.data, i, variants.string_pool);
filter[i] = is_except ? !find_result.isFound() : find_result.isFound();
if (filter[i])
++new_rows_num;
}
return new_rows_num;
}
void IntersectOrExceptTransform::accumulate(Chunk chunk)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(key_columns_pos.size());
for (auto pos : key_columns_pos)
column_ptrs.emplace_back(columns[pos].get());
if (data.empty())
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
addToSet(*data.NAME, column_ptrs, num_rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
}
void IntersectOrExceptTransform::filter(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(key_columns_pos.size());
for (auto pos : key_columns_pos)
column_ptrs.emplace_back(columns[pos].get());
if (data.empty())
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
IColumn::Filter filter(num_rows);
size_t new_rows_num = 0;
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
new_rows_num = buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
for (auto & column : columns)
column = column->filter(filter, -1);
chunk.setColumns(std::move(columns), new_rows_num);
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Interpreters/SetVariants.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
class IntersectOrExceptTransform : public IProcessor
{
public:
IntersectOrExceptTransform(bool is_except_, const Block & header_);
Status prepare() override;
void work() override;
String getName() const override { return is_except ? "Except" : "Intersect"; }
private:
bool push_empty_chunk = false;
Chunk empty_chunk;
bool is_except;
ColumnNumbers key_columns_pos;
SetVariants data;
Sizes key_sizes;
Chunk current_input_chunk;
Chunk current_output_chunk;
bool finished_second_input = false;
bool has_input = false;
OutputPort & output;
void accumulate(Chunk chunk);
void filter(Chunk & chunk);
template <typename Method>
void addToSet(
Method & method,
const ColumnRawPtrs & key_columns,
size_t rows,
SetVariants & variants) const;
template <typename Method>
size_t buildFilter(
Method & method,
const ColumnRawPtrs & columns,
IColumn::Filter & filter,
size_t rows,
SetVariants & variants) const;
};
}