From 3f9e9a7025bec607bb1e646556c415e3336c796d Mon Sep 17 00:00:00 2001 From: Kirill Ershov Date: Fri, 16 Apr 2021 23:18:39 +0300 Subject: [PATCH] Add INTERSECT and EXCEPT --- src/Common/ErrorCodes.cpp | 6 +- src/Interpreters/InterpreterFactory.cpp | 18 +- .../InterpreterIntersectOrExcept.cpp | 116 +++++++++++ .../InterpreterIntersectOrExcept.h | 35 ++++ src/Parsers/ASTIntersectOrExcept.cpp | 28 +++ src/Parsers/ASTIntersectOrExcept.h | 18 ++ src/Parsers/ParserIntersectOrExcept.cpp | 50 +++++ src/Parsers/ParserIntersectOrExcept.h | 14 ++ src/Parsers/ParserQueryWithOutput.cpp | 36 ++-- .../QueryPlan/IntersectOrExceptStep.cpp | 38 ++++ .../QueryPlan/IntersectOrExceptStep.h | 26 +++ .../Transforms/IntersectOrExceptTransform.cpp | 192 ++++++++++++++++++ .../Transforms/IntersectOrExceptTransform.h | 53 +++++ 13 files changed, 606 insertions(+), 24 deletions(-) create mode 100644 src/Interpreters/InterpreterIntersectOrExcept.cpp create mode 100644 src/Interpreters/InterpreterIntersectOrExcept.h create mode 100644 src/Parsers/ASTIntersectOrExcept.cpp create mode 100644 src/Parsers/ASTIntersectOrExcept.h create mode 100644 src/Parsers/ParserIntersectOrExcept.cpp create mode 100644 src/Parsers/ParserIntersectOrExcept.h create mode 100644 src/Processors/QueryPlan/IntersectOrExceptStep.cpp create mode 100644 src/Processors/QueryPlan/IntersectOrExceptStep.h create mode 100644 src/Processors/Transforms/IntersectOrExceptTransform.cpp create mode 100644 src/Processors/Transforms/IntersectOrExceptTransform.h diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index a2cd65137c0..0d1fd5bd7d8 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -533,7 +533,11 @@ M(564, INTERSERVER_SCHEME_DOESNT_MATCH) \ M(565, TOO_MANY_PARTITIONS) \ M(566, CANNOT_RMDIR) \ - \ + M(567, DUPLICATED_PART_UUIDS) \ + M(568, RAFT_ERROR) \ + M(569, MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD) \ + M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \ + M(571, INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH) \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index 15e4c52f040..e0f6479cc0e 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -1,14 +1,17 @@ #include #include #include -#include -#include #include +#include #include #include +#include #include #include +#include +#include #include +#include #include #include #include @@ -24,11 +27,9 @@ #include #include #include -#include -#include #include -#include #include +#include #include #include @@ -44,9 +45,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -65,7 +68,6 @@ #include #include #include -#include #include #include @@ -109,6 +111,10 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & ProfileEvents::increment(ProfileEvents::SelectQuery); return std::make_unique(query, context, options); } + else if (query->as()) + { + return std::make_unique(query, context); + } else if (query->as()) { ProfileEvents::increment(ProfileEvents::InsertQuery); diff --git a/src/Interpreters/InterpreterIntersectOrExcept.cpp b/src/Interpreters/InterpreterIntersectOrExcept.cpp new file mode 100644 index 00000000000..c85bd29e16f --- /dev/null +++ b/src/Interpreters/InterpreterIntersectOrExcept.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH; +} + +InterpreterIntersectOrExcept::InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_) + : query_ptr(query_ptr_), context(Context::createCopy(context_)) +{ + ASTIntersectOrExcept * ast = query_ptr->as(); + size_t num_children = ast->children.size(); + nested_interpreters.resize(num_children); + for (size_t i = 0; i < num_children; ++i) + { + nested_interpreters[i] = buildCurrentChildInterpreter(ast->children[i]); + } + + Blocks headers(num_children); + for (size_t query_num = 0; query_num < num_children; ++query_num) + headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); + + result_header = getCommonHeader(headers); +} + + +Block InterpreterIntersectOrExcept::getCommonHeader(const Blocks & headers) +{ + size_t num_selects = headers.size(); + Block common_header = headers.front(); + size_t num_columns = common_header.columns(); + + for (size_t query_num = 1; query_num < num_selects; ++query_num) + { + if (headers[query_num].columns() != num_columns) + throw Exception( + "Different number of columns in " + + toString(query_ptr->as()->is_except ? "EXCEPT" : "INTERSECT") + + " elements:\n" + common_header.dumpNames() + "\nand\n" + + headers[query_num].dumpNames() + "\n", + ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH); + } + + std::vector columns(num_selects); + + for (size_t column_num = 0; column_num < num_columns; ++column_num) + { + for (size_t i = 0; i < num_selects; ++i) + columns[i] = &headers[i].getByPosition(column_num); + + ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num); + result_elem = getLeastSuperColumn(columns); + } + + return common_header; +} + + +std::unique_ptr +InterpreterIntersectOrExcept::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_) +{ + if (ast_ptr_->as()) + return std::make_unique(ast_ptr_, context, SelectQueryOptions()); + else + return std::make_unique(ast_ptr_, context, SelectQueryOptions()); +} + +void InterpreterIntersectOrExcept::buildQueryPlan(QueryPlan & query_plan) +{ + size_t num_plans = nested_interpreters.size(); + + std::vector> plans(num_plans); + DataStreams data_streams(num_plans); + + for (size_t i = 0; i < num_plans; ++i) + { + plans[i] = std::make_unique(); + nested_interpreters[i]->buildQueryPlan(*plans[i]); + data_streams[i] = plans[i]->getCurrentDataStream(); + } + + auto max_threads = context->getSettingsRef().max_threads; + auto step = std::make_unique( + query_ptr->as()->is_except, std::move(data_streams), result_header, max_threads); + query_plan.unitePlans(std::move(step), std::move(plans)); +} + +BlockIO InterpreterIntersectOrExcept::execute() +{ + BlockIO res; + + QueryPlan query_plan; + buildQueryPlan(query_plan); + + auto pipeline = query_plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), + BuildQueryPipelineSettings::fromContext(context)); + + res.pipeline = std::move(*pipeline); + res.pipeline.addInterpreterContext(context); + + return res; +} +} diff --git a/src/Interpreters/InterpreterIntersectOrExcept.h b/src/Interpreters/InterpreterIntersectOrExcept.h new file mode 100644 index 00000000000..0069dc02f1d --- /dev/null +++ b/src/Interpreters/InterpreterIntersectOrExcept.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class Context; +class InterpreterSelectQuery; +class QueryPlan; + +class InterpreterIntersectOrExcept : public IInterpreter +{ +public: + InterpreterIntersectOrExcept(const ASTPtr & query_ptr_, ContextPtr context_); + + /// Builds QueryPlan for current query. + virtual void buildQueryPlan(QueryPlan & query_plan); + + BlockIO execute() override; + +private: + ASTPtr query_ptr; + ContextPtr context; + Block result_header; + std::vector> nested_interpreters; + Block getCommonHeader(const Blocks & headers); + + std::unique_ptr + buildCurrentChildInterpreter(const ASTPtr & ast_ptr_); +}; + +} diff --git a/src/Parsers/ASTIntersectOrExcept.cpp b/src/Parsers/ASTIntersectOrExcept.cpp new file mode 100644 index 00000000000..073d63963a9 --- /dev/null +++ b/src/Parsers/ASTIntersectOrExcept.cpp @@ -0,0 +1,28 @@ +#include +#include + +namespace DB +{ + +ASTPtr ASTIntersectOrExcept::clone() const +{ + auto res = std::make_shared(*this); + res->children.clear(); + res->children.push_back(children[0]->clone()); + res->children.push_back(children[1]->clone()); + res->is_except = is_except; + cloneOutputOptions(*res); + return res; +} + +void ASTIntersectOrExcept::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const +{ + children[0]->formatImpl(settings, state, frame); + std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' '); + settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") + << (is_except ? "EXCEPT" : "INTERSECT ") + << (settings.hilite ? hilite_none : "") << settings.nl_or_ws; + children[1]->formatImpl(settings, state, frame); +} + +} diff --git a/src/Parsers/ASTIntersectOrExcept.h b/src/Parsers/ASTIntersectOrExcept.h new file mode 100644 index 00000000000..a02cb9f7d77 --- /dev/null +++ b/src/Parsers/ASTIntersectOrExcept.h @@ -0,0 +1,18 @@ +#pragma once + +#include + + +namespace DB +{ + +class ASTIntersectOrExcept : public ASTQueryWithOutput +{ +public: + String getID(char) const override { return is_except ? "Except" : "Intersect"; } + ASTPtr clone() const override; + void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; + bool is_except; +}; + +} diff --git a/src/Parsers/ParserIntersectOrExcept.cpp b/src/Parsers/ParserIntersectOrExcept.cpp new file mode 100644 index 00000000000..a82b8c2b06b --- /dev/null +++ b/src/Parsers/ParserIntersectOrExcept.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ +bool ParserIntersectOrExcept::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ParserKeyword intersect_keyword("INTERSECT"); + ParserKeyword except_keyword("EXCEPT"); + ASTPtr left_node; + ASTPtr right_node; + + auto ast = std::make_shared(); + ast->is_except = false; + + if (!ParserSelectQuery().parse(pos, left_node, expected) && !ParserSubquery().parse(pos, left_node, expected)) + return false; + + if (!intersect_keyword.ignore(pos)) + { + if (!except_keyword.ignore(pos)) + { + return false; + } + else + { + ast->is_except = true; + } + } + + if (!ParserSelectQuery().parse(pos, right_node, expected) && !ParserSubquery().parse(pos, right_node, expected)) + return false; + + if (const auto * ast_subquery = left_node->as()) + left_node = ast_subquery->children.at(0); + if (const auto * ast_subquery = right_node->as()) + right_node = ast_subquery->children.at(0); + + ast->children.push_back(left_node); + ast->children.push_back(right_node); + + node = ast; + return true; +} + +} diff --git a/src/Parsers/ParserIntersectOrExcept.h b/src/Parsers/ParserIntersectOrExcept.h new file mode 100644 index 00000000000..61cc74cf0a9 --- /dev/null +++ b/src/Parsers/ParserIntersectOrExcept.h @@ -0,0 +1,14 @@ +#pragma once +#include + + +namespace DB +{ +class ParserIntersectOrExcept : public IParserBase +{ +protected: + const char * getName() const override { return "INTERSECT or EXCEPT"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} diff --git a/src/Parsers/ParserQueryWithOutput.cpp b/src/Parsers/ParserQueryWithOutput.cpp index d5aa1e47533..35355b29ebf 100644 --- a/src/Parsers/ParserQueryWithOutput.cpp +++ b/src/Parsers/ParserQueryWithOutput.cpp @@ -1,36 +1,37 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include #include #include -#include +#include +#include +#include +#include #include - namespace DB { bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserShowTablesQuery show_tables_p; + ParserIntersectOrExcept intersect_p; ParserSelectWithUnionQuery select_p; ParserTablePropertiesQuery table_p; ParserDescribeTableQuery describe_table_p; @@ -54,6 +55,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec bool parsed = explain_p.parse(pos, query, expected) + || intersect_p.parse(pos, query, expected) || select_p.parse(pos, query, expected) || show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p` || show_tables_p.parse(pos, query, expected) diff --git a/src/Processors/QueryPlan/IntersectOrExceptStep.cpp b/src/Processors/QueryPlan/IntersectOrExceptStep.cpp new file mode 100644 index 00000000000..d0a820339d7 --- /dev/null +++ b/src/Processors/QueryPlan/IntersectOrExceptStep.cpp @@ -0,0 +1,38 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +IntersectOrExceptStep::IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_) + : is_except(is_except_), header(std::move(result_header)), max_threads(max_threads_) +{ + input_streams = std::move(input_streams_); + output_stream = DataStream{.header = header}; +} + +QueryPipelinePtr IntersectOrExceptStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & ) +{ + auto pipeline = std::make_unique(); + QueryPipelineProcessorsCollector collector(*pipeline, this); + + pipelines[0]->addTransform(std::make_shared(header, pipelines[0]->getNumStreams(), 1)); + pipelines[1]->addTransform(std::make_shared(header, pipelines[1]->getNumStreams(), 1)); + + *pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads); + pipeline->addTransform(std::make_shared(is_except, header)); + + processors = collector.detachProcessors(); + return pipeline; +} + +void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const +{ + IQueryPlanStep::describePipeline(processors, settings); +} + +} diff --git a/src/Processors/QueryPlan/IntersectOrExceptStep.h b/src/Processors/QueryPlan/IntersectOrExceptStep.h new file mode 100644 index 00000000000..d2b515bb1c4 --- /dev/null +++ b/src/Processors/QueryPlan/IntersectOrExceptStep.h @@ -0,0 +1,26 @@ +#pragma once +#include + +namespace DB +{ + +class IntersectOrExceptStep : public IQueryPlanStep +{ +public: + /// max_threads is used to limit the number of threads for result pipeline. + IntersectOrExceptStep(bool is_except_, DataStreams input_streams_, Block result_header, size_t max_threads_ = 0); + + String getName() const override { return is_except ? "Except" : "Intersect"; } + + QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override; + + void describePipeline(FormatSettings & settings) const override; +private: + bool is_except; + Block header; + size_t max_threads; + Processors processors; +}; + +} + diff --git a/src/Processors/Transforms/IntersectOrExceptTransform.cpp b/src/Processors/Transforms/IntersectOrExceptTransform.cpp new file mode 100644 index 00000000000..199498bf762 --- /dev/null +++ b/src/Processors/Transforms/IntersectOrExceptTransform.cpp @@ -0,0 +1,192 @@ +#include + +namespace DB +{ +IntersectOrExceptTransform::IntersectOrExceptTransform(bool is_except_, const Block & header_) + : IProcessor(InputPorts(2, header_), {header_}), is_except(is_except_), output(outputs.front()) +{ + const Names & columns = header_.getNames(); + size_t num_columns = columns.empty() ? header_.columns() : columns.size(); + + key_columns_pos.reserve(columns.size()); + for (size_t i = 0; i < num_columns; ++i) + { + auto pos = columns.empty() ? i : header_.getPositionByName(columns[i]); + + const auto & col = header_.getByPosition(pos).column; + + if (!(col && isColumnConst(*col))) + key_columns_pos.emplace_back(pos); + } +} + +IntersectOrExceptTransform::Status IntersectOrExceptTransform::prepare() +{ + /// Check can output. + if (output.isFinished()) + { + for (auto & in : inputs) + in.close(); + return Status::Finished; + } + + if (!output.canPush()) + { + if (inputs.front().isFinished()) + { + inputs.back().setNotNeeded(); + } + else + { + inputs.front().setNotNeeded(); + } + return Status::PortFull; + } + + /// Output if has data. + if (current_output_chunk) + { + output.push(std::move(current_output_chunk)); + } + + if (push_empty_chunk) + { + output.push(std::move(empty_chunk)); + push_empty_chunk = false; + } + + if (finished_second_input) + { + if (inputs.front().isFinished()) + { + output.finish(); + return Status::Finished; + } + } + else if (inputs.back().isFinished()) + { + finished_second_input = true; + } + + InputPort & input = finished_second_input ? inputs.front() : inputs.back(); + + /// Check can input. + if (!has_input) + { + input.setNeeded(); + if (!input.hasData()) + { + return Status::NeedData; + } + + current_input_chunk = input.pull(); + has_input = true; + } + + return Status::Ready; +} + +void IntersectOrExceptTransform::work() +{ + if (!finished_second_input) + { + accumulate(std::move(current_input_chunk)); + } + else + { + filter(current_input_chunk); + current_output_chunk = std::move(current_input_chunk); + } + + has_input = false; +} + +template +void IntersectOrExceptTransform::addToSet(Method & method, const ColumnRawPtrs & columns, size_t rows, SetVariants & variants) const +{ + typename Method::State state(columns, key_sizes, nullptr); + + for (size_t i = 0; i < rows; ++i) + { + state.emplaceKey(method.data, i, variants.string_pool); + } +} + +template +size_t IntersectOrExceptTransform::buildFilter( + Method & method, const ColumnRawPtrs & columns, IColumn::Filter & filter, size_t rows, SetVariants & variants) const +{ + typename Method::State state(columns, key_sizes, nullptr); + size_t new_rows_num = 0; + + for (size_t i = 0; i < rows; ++i) + { + auto find_result = state.findKey(method.data, i, variants.string_pool); + filter[i] = is_except ? !find_result.isFound() : find_result.isFound(); + if (filter[i]) + ++new_rows_num; + } + return new_rows_num; +} + +void IntersectOrExceptTransform::accumulate(Chunk chunk) +{ + auto num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + ColumnRawPtrs column_ptrs; + column_ptrs.reserve(key_columns_pos.size()); + for (auto pos : key_columns_pos) + column_ptrs.emplace_back(columns[pos].get()); + + if (data.empty()) + data.init(SetVariants::chooseMethod(column_ptrs, key_sizes)); + + switch (data.type) + { + case SetVariants::Type::EMPTY: + break; +#define M(NAME) \ + case SetVariants::Type::NAME: \ + addToSet(*data.NAME, column_ptrs, num_rows, data); \ + break; + APPLY_FOR_SET_VARIANTS(M) +#undef M + } +} + +void IntersectOrExceptTransform::filter(Chunk & chunk) +{ + auto num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + ColumnRawPtrs column_ptrs; + column_ptrs.reserve(key_columns_pos.size()); + for (auto pos : key_columns_pos) + column_ptrs.emplace_back(columns[pos].get()); + + if (data.empty()) + data.init(SetVariants::chooseMethod(column_ptrs, key_sizes)); + + IColumn::Filter filter(num_rows); + + size_t new_rows_num = 0; + switch (data.type) + { + case SetVariants::Type::EMPTY: + break; +#define M(NAME) \ + case SetVariants::Type::NAME: \ + new_rows_num = buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \ + break; + APPLY_FOR_SET_VARIANTS(M) +#undef M + } + + for (auto & column : columns) + column = column->filter(filter, -1); + + chunk.setColumns(std::move(columns), new_rows_num); +} + +} diff --git a/src/Processors/Transforms/IntersectOrExceptTransform.h b/src/Processors/Transforms/IntersectOrExceptTransform.h new file mode 100644 index 00000000000..ebe73fdeb26 --- /dev/null +++ b/src/Processors/Transforms/IntersectOrExceptTransform.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class IntersectOrExceptTransform : public IProcessor +{ +public: + IntersectOrExceptTransform(bool is_except_, const Block & header_); + + Status prepare() override; + void work() override; + + String getName() const override { return is_except ? "Except" : "Intersect"; } + +private: + + bool push_empty_chunk = false; + Chunk empty_chunk; + + bool is_except; + ColumnNumbers key_columns_pos; + SetVariants data; + Sizes key_sizes; + Chunk current_input_chunk; + Chunk current_output_chunk; + bool finished_second_input = false; + bool has_input = false; + OutputPort & output; + + void accumulate(Chunk chunk); + void filter(Chunk & chunk); + template + void addToSet( + Method & method, + const ColumnRawPtrs & key_columns, + size_t rows, + SetVariants & variants) const; + + template + size_t buildFilter( + Method & method, + const ColumnRawPtrs & columns, + IColumn::Filter & filter, + size_t rows, + SetVariants & variants) const; +}; + +}