This commit is contained in:
kssenii 2021-08-12 14:42:51 +03:00
parent 2306fbe9be
commit a549e29bd4
20 changed files with 363 additions and 166 deletions

View File

@ -11,7 +11,7 @@
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTRenameQuery.h>
@ -49,7 +49,7 @@
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterIntersectOrExcept.h>
#include <Interpreters/InterpreterSelectIntersectExceptQuery.h>
#include <Interpreters/InterpreterKillQueryQuery.h>
#include <Interpreters/InterpreterOptimizeQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
@ -111,9 +111,9 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
ProfileEvents::increment(ProfileEvents::SelectQuery);
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, options);
}
else if (query->as<ASTIntersectOrExcept>())
else if (query->as<ASTSelectIntersectExceptQuery>())
{
return std::make_unique<InterpreterIntersectOrExcept>(query, context);
return std::make_unique<InterpreterSelectIntersectExceptQuery>(query, context, options);
}
else if (query->as<ASTInsertQuery>())
{

View File

@ -1,39 +0,0 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTIntersectOrExcept.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;
class QueryPlan;
class InterpreterIntersectOrExcept : public IInterpreter
{
public:
InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_);
BlockIO execute() override;
private:
String getName() const { return "IntersectOrExcept"; }
Block getCommonHeader(const Blocks & headers) const;
std::unique_ptr<IInterpreterUnionOrSelectQuery>
buildCurrentChildInterpreter(const ASTPtr & ast_ptr_);
void buildQueryPlan(QueryPlan & query_plan);
ContextPtr context;
Block result_header;
std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
ASTIntersectOrExcept::Operators operators;
};
}

View File

@ -1,8 +1,8 @@
#include <Columns/getLeastSuperColumn.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterIntersectOrExcept.h>
#include <Interpreters/InterpreterSelectIntersectExceptQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
@ -19,27 +19,7 @@ namespace ErrorCodes
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
}
InterpreterIntersectOrExcept::InterpreterIntersectOrExcept(const ASTPtr & query_ptr, ContextPtr context_)
: context(Context::createCopy(context_))
{
ASTIntersectOrExcept * ast = query_ptr->as<ASTIntersectOrExcept>();
operators = ast->list_of_operators;
auto children = ast->list_of_selects->children;
size_t num_children = children.size();
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeader(headers);
}
Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers) const
static Block getCommonHeader(const Blocks & headers)
{
size_t num_selects = headers.size();
Block common_header = headers.front();
@ -49,8 +29,8 @@ Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers) cons
{
if (headers[query_num].columns() != num_columns)
throw Exception(ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH,
"Different number of columns in {} elements:\n {} \nand\n {}",
getName(), common_header.dumpNames(), headers[query_num].dumpNames());
"Different number of columns in IntersectExceptQuery elements:\n {} \nand\n {}",
common_header.dumpNames(), headers[query_num].dumpNames());
}
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
@ -66,16 +46,53 @@ Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers) cons
return common_header;
}
InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions & options_)
: IInterpreterUnionOrSelectQuery(query_ptr_->clone(), context_, options_)
{
ASTSelectIntersectExceptQuery * ast = query_ptr->as<ASTSelectIntersectExceptQuery>();
final_operator = ast->final_operator;
const auto & children = ast->children[0]->children;
size_t num_children = children.size();
/// AST must have been changed by the visitor.
if (final_operator == Operator::UNKNOWN || num_children != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"SelectIntersectExceptyQuery has not been normalized (number of children: {})",
num_children);
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeader(headers);
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>
InterpreterIntersectOrExcept::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_)
InterpreterSelectIntersectExceptQuery::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_)
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, context, SelectQueryOptions());
else
if (ast_ptr_->as<ASTSelectQuery>())
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, context, SelectQueryOptions());
if (ast_ptr_->as<ASTSelectIntersectExceptQuery>())
return std::make_unique<InterpreterSelectIntersectExceptQuery>(ast_ptr_, context, SelectQueryOptions());
// if (ast_ptr_->as<ASTSubquery>())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected query: {}", ast_ptr_->getID());
}
void InterpreterIntersectOrExcept::buildQueryPlan(QueryPlan & query_plan)
void InterpreterSelectIntersectExceptQuery::buildQueryPlan(QueryPlan & query_plan)
{
size_t num_plans = nested_interpreters.size();
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
@ -101,11 +118,11 @@ void InterpreterIntersectOrExcept::buildQueryPlan(QueryPlan & query_plan)
}
auto max_threads = context->getSettingsRef().max_threads;
auto step = std::make_unique<IntersectOrExceptStep>(std::move(data_streams), operators, max_threads);
auto step = std::make_unique<IntersectOrExceptStep>(std::move(data_streams), final_operator, max_threads);
query_plan.unitePlans(std::move(step), std::move(plans));
}
BlockIO InterpreterIntersectOrExcept::execute()
BlockIO InterpreterSelectIntersectExceptQuery::execute()
{
BlockIO res;

View File

@ -0,0 +1,45 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;
class QueryPlan;
class InterpreterSelectIntersectExceptQuery : public IInterpreterUnionOrSelectQuery
{
using Operator = ASTSelectIntersectExceptQuery::Operator;
public:
InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions & options_);
BlockIO execute() override;
Block getSampleBlock() { return result_header; }
private:
static String getName() { return "SelectIntersectExceptQuery"; }
std::unique_ptr<IInterpreterUnionOrSelectQuery>
buildCurrentChildInterpreter(const ASTPtr & ast_ptr_);
void buildQueryPlan(QueryPlan & query_plan) override;
void ignoreWithTotals() override {}
std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
Operator final_operator;
};
}

View File

@ -2,8 +2,10 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectIntersectExceptQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -208,8 +210,10 @@ InterpreterSelectWithUnionQuery::buildCurrentChildInterpreter(const ASTPtr & ast
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, context, options, current_required_result_column_names);
else
else if (ast_ptr_->as<ASTSelectQuery>())
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, context, options, current_required_result_column_names);
else
return std::make_unique<InterpreterSelectIntersectExceptQuery>(ast_ptr_, context, options);
}
InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
@ -225,10 +229,14 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(const ASTPtr & query_ptr_,
}
if (is_subquery)
{
return cache[key]
= InterpreterSelectWithUnionQuery(query_ptr_, context_, SelectQueryOptions().subquery().analyze()).getSampleBlock();
}
else
{
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr_, context_, SelectQueryOptions().analyze()).getSampleBlock();
}
}

View File

@ -0,0 +1,85 @@
#include <Interpreters/SelectIntersectExceptQueryVisitor.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/typeid_cast.h>
namespace DB
{
void SelectIntersectExceptQueryMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * select_intersect_except = ast->as<ASTSelectIntersectExceptQuery>())
{
std::cerr << "\n\nSelectIntersectExceptVisitor BEFORE:\n" << ast->dumpTree() << std::endl;
data.initialize(select_intersect_except);
visit(*select_intersect_except, data);
std::cerr << "\n\nSelectIntersectExceptVisitor AFTER:\n" << ast->dumpTree() << std::endl;
}
}
void SelectIntersectExceptQueryMatcher::visit(ASTSelectIntersectExceptQuery & ast, Data & data)
{
/* Example: select 1 intersect select 1 intsect select 1 intersect select 1 intersect select 1;
*
* --SelectIntersectExceptQuery --SelectIntersectExceptQuery
* ---expressionlist ---ExpressionList
* ----SelectQuery ----SelectIntersectExceptQuery
* ----SelectQuery ------ExpressionList
* ----SelectQuery ---> -------SelectIntersectExceptQuery
* ----SelectQuery --------ExpressionList
* ---------SelectQuery
* ---------SelectQuery
* -------SelectQuery
* ----SelectQuery
**/
auto & selects = data.reversed_list_of_selects;
if (selects.empty())
return;
const auto left = selects.back();
selects.pop_back();
const auto right = selects.back();
selects.pop_back();
auto & operators = data.reversed_list_of_operators;
const auto current_operator = operators.back();
operators.pop_back();
auto list_node = std::make_shared<ASTExpressionList>();
list_node->children = {left, right};
if (selects.empty())
{
ast.final_operator = current_operator;
ast.children = {std::move(list_node)};
}
else
{
auto select_intersect_except = std::make_shared<ASTSelectIntersectExceptQuery>();
select_intersect_except->final_operator = {current_operator};
select_intersect_except->children.emplace_back(std::move(list_node));
selects.emplace_back(std::move(select_intersect_except));
}
visit(ast, data);
}
// void SelectIntersectExceptQueryVisitor::visit(ASTSelectWithUnionQuery & ast, Data & data)
// {
// auto & union_modes = ast.list_of_modes;
// ASTs selects;
// auto & select_list = ast.list_of_selects->children;
//
//
// // reverse children list
// std::reverse(selects.begin(), selects.end());
//
// ast.is_normalized = true;
// ast.union_mode = ASTSelectWithUnionQuery::Mode::ALL;
//
// ast.list_of_selects->children = std::move(selects);
// }
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <unordered_set>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB
{
class ASTFunction;
class SelectIntersectExceptQueryMatcher
{
public:
struct Data
{
Data() = default;
void initialize(const ASTSelectIntersectExceptQuery * select_intersect_except)
{
reversed_list_of_selects = select_intersect_except->list_of_selects->clone()->children;
reversed_list_of_operators = select_intersect_except->list_of_operators;
std::reverse(reversed_list_of_selects.begin(), reversed_list_of_selects.end());
std::reverse(reversed_list_of_operators.begin(), reversed_list_of_operators.end());
}
ASTs reversed_list_of_selects;
ASTSelectIntersectExceptQuery::Operators reversed_list_of_operators;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data &);
static void visit(ASTSelectIntersectExceptQuery &, Data &);
// static void visit(ASTSelectWithUnionQuery &, Data &);
};
/// Visit children first.
using SelectIntersectExceptQueryVisitor
= InDepthNodeVisitor<SelectIntersectExceptQueryMatcher, false>;
}

View File

@ -46,6 +46,7 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/SelectIntersectExceptQueryVisitor.h>
#include <Common/ProfileEvents.h>
#include <Common/SensitiveDataMasker.h>
@ -490,9 +491,16 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ApplyWithGlobalVisitor().visit(ast);
}
/// Normalize SelectWithUnionQuery
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(ast);
{
/// Normalize SelectWithUnionQuery
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(ast);
}
{
SelectIntersectExceptQueryVisitor::Data data;
SelectIntersectExceptQueryVisitor{data}.visit(ast);
}
/// Check the limits.
checkASTSizeLimits(*ast, settings);
@ -532,6 +540,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// reset Input callbacks if query is not INSERT SELECT
context->resetInputCallbacks();
std::cerr << "\n\nAST: " << ast->dumpTree() << std::endl;
auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
std::shared_ptr<const EnabledQuota> quota;

View File

@ -1,4 +1,4 @@
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
@ -6,20 +6,25 @@
namespace DB
{
ASTPtr ASTIntersectOrExcept::clone() const
ASTPtr ASTSelectIntersectExceptQuery::clone() const
{
auto res = std::make_shared<ASTIntersectOrExcept>(*this);
res->children.clear();
auto res = std::make_shared<ASTSelectIntersectExceptQuery>(*this);
res->children.clear();
for (const auto & child : children)
res->children.push_back(child->clone());
if (res->list_of_selects)
res->list_of_selects = list_of_selects->clone();
res->list_of_selects = list_of_selects->clone();
res->children.push_back(res->list_of_selects);
res->list_of_operators = list_of_operators;
res->final_operator = final_operator;
cloneOutputOptions(*res);
return res;
}
void ASTIntersectOrExcept::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
void ASTSelectIntersectExceptQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');

View File

@ -6,17 +6,20 @@
namespace DB
{
class ASTIntersectOrExcept : public ASTQueryWithOutput
class ASTSelectIntersectExceptQuery : public ASTQueryWithOutput
{
public:
String getID(char) const override { return "IntersectExceptQuery"; }
String getID(char) const override { return "SelectIntersectExceptQuery"; }
ASTPtr clone() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "SelectIntersectExcept"; }
enum class Operator
{
UNKNOWN,
INTERSECT,
EXCEPT
};
@ -25,6 +28,9 @@ public:
ASTPtr list_of_selects;
Operators list_of_operators;
/// Final operator after applying visitor.
Operator final_operator = Operator::UNKNOWN;
};
}

View File

@ -7,7 +7,7 @@
#include <Parsers/ParserDescribeTableQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/ParserIntersectOrExceptQuery.h>
#include <Parsers/ParserSelectIntersectExceptQuery.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserQueryWithOutput.h>
@ -31,7 +31,6 @@ namespace DB
bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserShowTablesQuery show_tables_p;
ParserIntersectOrExceptQuery intersect_except_p;
ParserSelectWithUnionQuery select_p;
ParserTablePropertiesQuery table_p;
ParserDescribeTableQuery describe_table_p;
@ -55,7 +54,6 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool parsed =
explain_p.parse(pos, query, expected)
|| intersect_except_p.parse(pos, query, expected)
|| select_p.parse(pos, query, expected)
|| show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`
|| show_tables_p.parse(pos, query, expected)

View File

@ -1,9 +1,9 @@
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserIntersectOrExceptQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserSelectIntersectExceptQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ASTExpressionList.h>
@ -11,18 +11,18 @@
namespace DB
{
bool ParserIntersectOrExceptQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserSelectIntersectExceptQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword intersect_keyword("INTERSECT");
ParserKeyword except_keyword("EXCEPT");
ASTs elements;
ASTIntersectOrExcept::Operators operators;
ASTSelectIntersectExceptQuery::Operators operators;
auto parse_element = [&]() -> bool
{
ASTPtr element;
if (!ParserSelectWithUnionQuery().parse(pos, element, expected) && !ParserSubquery().parse(pos, element, expected))
if (!ParserSelectQuery().parse(pos, element, expected) && !ParserSubquery().parse(pos, element, expected))
return false;
elements.push_back(element);
@ -36,11 +36,11 @@ bool ParserIntersectOrExceptQuery::parseImpl(Pos & pos, ASTPtr & node, Expected
if (!except_keyword.ignore(pos))
return false;
operators.emplace_back(ASTIntersectOrExcept::Operator::EXCEPT);
operators.emplace_back(ASTSelectIntersectExceptQuery::Operator::EXCEPT);
return true;
}
operators.emplace_back(ASTIntersectOrExcept::Operator::INTERSECT);
operators.emplace_back(ASTSelectIntersectExceptQuery::Operator::INTERSECT);
return true;
};
@ -56,7 +56,7 @@ bool ParserIntersectOrExceptQuery::parseImpl(Pos & pos, ASTPtr & node, Expected
auto list_node = std::make_shared<ASTExpressionList>();
list_node->children = std::move(elements);
auto intersect_or_except_ast = std::make_shared<ASTIntersectOrExcept>();
auto intersect_or_except_ast = std::make_shared<ASTSelectIntersectExceptQuery>();
node = intersect_or_except_ast;
intersect_or_except_ast->list_of_selects = list_node;

View File

@ -4,7 +4,7 @@
namespace DB
{
class ParserIntersectOrExceptQuery : public IParserBase
class ParserSelectIntersectExceptQuery : public IParserBase
{
protected:
const char * getName() const override { return "INTERSECT or EXCEPT"; }

View File

@ -2,6 +2,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ParserUnionQueryElement.h>
#include <Parsers/ParserSelectIntersectExceptQuery.h>
#include <Common/typeid_cast.h>
@ -10,7 +11,9 @@ namespace DB
bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))
if (!ParserSubquery().parse(pos, node, expected)
&& !ParserSelectIntersectExceptQuery().parse(pos, node, expected)
&& !ParserSelectQuery().parse(pos, node, expected))
return false;
if (const auto * ast_subquery = node->as<ASTSubquery>())

View File

@ -1,7 +1,8 @@
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPipeline.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/IntersectOrExceptTransform.h>
@ -11,22 +12,22 @@
namespace DB
{
Block IntersectOrExceptStep::checkHeaders(const DataStreams & input_streams_) const
static Block checkHeaders(const DataStreams & input_streams_)
{
if (input_streams_.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot perform intersect/except on empty set of query plan steps");
Block res = input_streams_.front().header;
for (const auto & stream : input_streams_)
assertBlocksHaveEqualStructure(stream.header, res, "IntersectExceptStep");
assertBlocksHaveEqualStructure(stream.header, res, "IntersectOrExceptStep");
return res;
}
IntersectOrExceptStep::IntersectOrExceptStep(
DataStreams input_streams_ , const Operators & operators_ , size_t max_threads_)
DataStreams input_streams_ , Operator operator_ , size_t max_threads_)
: header(checkHeaders(input_streams_))
, operators(operators_)
, current_operator(operator_)
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
@ -67,7 +68,7 @@ QueryPipelinePtr IntersectOrExceptStep::updatePipeline(QueryPipelines pipelines,
}
*pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads);
pipeline->addTransform(std::make_shared<IntersectOrExceptTransform>(header, operators));
pipeline->addTransform(std::make_shared<IntersectOrExceptTransform>(header, current_operator));
processors = collector.detachProcessors();
return pipeline;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
namespace DB
@ -8,11 +8,11 @@ namespace DB
class IntersectOrExceptStep : public IQueryPlanStep
{
using Operators = ASTIntersectOrExcept::Operators;
using Operator = ASTSelectIntersectExceptQuery::Operator;
public:
/// max_threads is used to limit the number of threads for result pipeline.
IntersectOrExceptStep(DataStreams input_streams_, const Operators & operators_, size_t max_threads_ = 0);
IntersectOrExceptStep(DataStreams input_streams_, Operator operators_, size_t max_threads_ = 0);
String getName() const override { return "IntersectOrExcept"; }
@ -21,10 +21,8 @@ public:
void describePipeline(FormatSettings & settings) const override;
private:
Block checkHeaders(const DataStreams & input_streams_) const;
Block header;
Operators operators;
Operator current_operator;
size_t max_threads;
Processors processors;
};

View File

@ -4,15 +4,14 @@
namespace DB
{
/*
* There are always at least two inputs. Number of operators is always number of inputs minus 1.
* input1 {operator1} input2 {operator2} input3 ...
**/
IntersectOrExceptTransform::IntersectOrExceptTransform(const Block & header_, const Operators & operators_)
: IProcessor(InputPorts(operators_.size() + 1, header_), {header_})
, operators(operators_)
, first_input(inputs.begin())
, second_input(std::next(inputs.begin()))
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
IntersectOrExceptTransform::IntersectOrExceptTransform(const Block & header_, Operator operator_)
: IProcessor(InputPorts(2, header_), {header_})
, current_operator(operator_)
{
const Names & columns = header_.getNames();
size_t num_columns = columns.empty() ? header_.columns() : columns.size();
@ -46,53 +45,33 @@ IntersectOrExceptTransform::Status IntersectOrExceptTransform::prepare()
return Status::PortFull;
}
if (current_output_chunk)
{
output.push(std::move(current_output_chunk));
}
if (finished_second_input)
{
if (first_input->isFinished() || (use_accumulated_input && !current_input_chunk))
if (inputs.front().isFinished())
{
std::advance(second_input, 1);
if (second_input == inputs.end())
{
if (current_output_chunk)
{
output.push(std::move(current_output_chunk));
}
output.finish();
return Status::Finished;
}
else
{
use_accumulated_input = true;
data.reset();
finished_second_input = false;
++current_operator_pos;
}
output.finish();
return Status::Finished;
}
}
else if (second_input->isFinished())
else if (inputs.back().isFinished())
{
finished_second_input = true;
}
if (!has_input)
{
if (finished_second_input && use_accumulated_input)
{
current_input_chunk = std::move(current_output_chunk);
}
else
{
InputPort & input = finished_second_input ? *first_input : *second_input;
InputPort & input = finished_second_input ? inputs.front() : inputs.back();
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_input_chunk = input.pull();
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_input_chunk = input.pull();
has_input = true;
}
@ -136,7 +115,7 @@ size_t IntersectOrExceptTransform::buildFilter(
for (size_t i = 0; i < rows; ++i)
{
auto find_result = state.findKey(method.data, i, variants.string_pool);
filter[i] = operators[current_operator_pos] == ASTIntersectOrExcept::Operator::EXCEPT ? !find_result.isFound() : find_result.isFound();
filter[i] = current_operator == ASTSelectIntersectExceptQuery::Operator::EXCEPT ? !find_result.isFound() : find_result.isFound();
if (filter[i])
++new_rows_num;
}
@ -193,10 +172,11 @@ void IntersectOrExceptTransform::filter(Chunk & chunk)
if (data->empty())
data->init(SetVariants::chooseMethod(column_ptrs, key_sizes));
IColumn::Filter filter(num_rows);
size_t new_rows_num = 0;
IColumn::Filter filter(num_rows);
auto & data_set = *data;
switch (data->type)
{
case SetVariants::Type::EMPTY:
@ -209,6 +189,9 @@ void IntersectOrExceptTransform::filter(Chunk & chunk)
#undef M
}
if (!new_rows_num)
return;
for (auto & column : columns)
column = column->filter(filter, -1);

View File

@ -3,7 +3,7 @@
#include <Processors/IProcessor.h>
#include <Interpreters/SetVariants.h>
#include <Core/ColumnNumbers.h>
#include <Parsers/ASTIntersectOrExcept.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
namespace DB
@ -11,10 +11,10 @@ namespace DB
class IntersectOrExceptTransform : public IProcessor
{
using Operators = ASTIntersectOrExcept::Operators;
using Operator = ASTSelectIntersectExceptQuery::Operator;
public:
IntersectOrExceptTransform(const Block & header_, const Operators & operators);
IntersectOrExceptTransform(const Block & header_, Operator operators);
String getName() const override { return "IntersectOrExcept"; }
@ -24,10 +24,7 @@ protected:
void work() override;
private:
Operators operators;
InputPorts::iterator first_input;
InputPorts::iterator second_input;
size_t current_operator_pos = 0;
Operator current_operator;
ColumnNumbers key_columns_pos;
std::optional<SetVariants> data;
@ -36,7 +33,6 @@ private:
Chunk current_input_chunk;
Chunk current_output_chunk;
bool use_accumulated_input = false;
bool finished_second_input = false;
bool has_input = false;

View File

@ -32,6 +32,16 @@ select 1 intersect select 1 except select 1;
select 1 intersect select 1 except select 2 intersect select 1 except select 3 intersect select 1;
1
select 1 intersect select 1 except select 2 intersect select 1 except select 3 intersect select 2;
select number from numbers(10) except select 5;
0
1
2
3
4
6
7
8
9
select number from numbers(100) intersect select number from numbers(20, 60) except select number from numbers(30, 20) except select number from numbers(60, 20);
20
21
@ -53,3 +63,18 @@ select number from numbers(100) intersect select number from numbers(20, 60) exc
57
58
59
with (select number from numbers(10) intersect select 5) as a select a * 10;
50
select count() from (select number from numbers(10) except select 5);
9
select count() from (select number from numbers(1000000) intersect select number from numbers(200000, 600000));
600000
select count() from (select number from numbers(100) intersect select number from numbers(20, 60) except select number from numbers(30, 20) except select number from numbers(60, 20));
20
select count() from (select number from numbers(1000000) intersect select number from numbers(200000, 600000) except select number from numbers(300000, 200000) except select number from numbers(600000, 200000));
200000
select 1 union all select 1 intersect select 1;
1
1
select 1 union all select 1 intersect select 2;
1

View File

@ -15,4 +15,14 @@ select 1 intersect select 1 except select 1;
select 1 intersect select 1 except select 2 intersect select 1 except select 3 intersect select 1;
select 1 intersect select 1 except select 2 intersect select 1 except select 3 intersect select 2;
select number from numbers(10) except select 5;
select number from numbers(100) intersect select number from numbers(20, 60) except select number from numbers(30, 20) except select number from numbers(60, 20);
with (select number from numbers(10) intersect select 5) as a select a * 10;
select count() from (select number from numbers(10) except select 5);
select count() from (select number from numbers(1000000) intersect select number from numbers(200000, 600000));
select count() from (select number from numbers(100) intersect select number from numbers(20, 60) except select number from numbers(30, 20) except select number from numbers(60, 20));
select count() from (select number from numbers(1000000) intersect select number from numbers(200000, 600000) except select number from numbers(300000, 200000) except select number from numbers(600000, 200000));
select 1 union all select 1 intersect select 1;
select 1 union all select 1 intersect select 2;