From 068d635b818d37d374f203be927a8d1995b4970c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 16 Oct 2024 16:53:57 +0000 Subject: [PATCH] Allow update plan headers for all the steps. --- src/Processors/QueryPlan/AggregatingStep.cpp | 5 ++- src/Processors/QueryPlan/AggregatingStep.h | 2 ++ src/Processors/QueryPlan/CreatingSetsStep.h | 5 +++ src/Processors/QueryPlan/IQueryPlanStep.cpp | 17 +++++++++ src/Processors/QueryPlan/IQueryPlanStep.h | 23 +++--------- src/Processors/QueryPlan/ISourceStep.h | 3 ++ src/Processors/QueryPlan/ITransformingStep.h | 2 -- .../QueryPlan/IntersectOrExceptStep.cpp | 8 ++++- .../QueryPlan/IntersectOrExceptStep.h | 2 ++ src/Processors/QueryPlan/JoinStep.h | 2 -- .../Optimizations/filterPushDown.cpp | 16 ++------- src/Processors/QueryPlan/QueryPlan.cpp | 35 ------------------- src/Processors/QueryPlan/UnionStep.cpp | 4 +-- src/Processors/QueryPlan/UnionStep.h | 2 -- 14 files changed, 49 insertions(+), 77 deletions(-) diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index defe7d0489a..efe14edaf35 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -589,8 +589,11 @@ AggregatingProjectionStep::AggregatingProjectionStep( , merge_threads(merge_threads_) , temporary_data_merge_threads(temporary_data_merge_threads_) { - input_headers = std::move(input_headers_); + updateInputHeaders(std::move(input_headers_)); +} +void AggregatingProjectionStep::updateOutputHeader() +{ if (input_headers.size() != 2) throw Exception( ErrorCodes::LOGICAL_ERROR, diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index b1f28f17ef9..d76764f05ba 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -123,6 +123,8 @@ public: QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override; private: + void updateOutputHeader() override; + Aggregator::Params params; bool final; size_t merge_threads; diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h index 54548a53131..0495ca2e638 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.h +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -45,6 +45,9 @@ public: QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override; void describePipeline(FormatSettings & settings) const override; + +private: + void updateOutputHeader() override { output_header = getInputHeaders().front(); } }; /// This is a temporary step which is converted to CreatingSetStep after plan optimization. @@ -64,6 +67,8 @@ public: PreparedSets::Subqueries detachSets() { return std::move(subqueries); } private: + void updateOutputHeader() override { output_header = getInputHeaders().front(); } + PreparedSets::Subqueries subqueries; ContextPtr context; }; diff --git a/src/Processors/QueryPlan/IQueryPlanStep.cpp b/src/Processors/QueryPlan/IQueryPlanStep.cpp index bb1451287d9..aeb94e8826d 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.cpp +++ b/src/Processors/QueryPlan/IQueryPlanStep.cpp @@ -10,6 +10,23 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +void IQueryPlanStep::updateInputHeaders(Headers input_headers_) +{ + input_headers = std::move(input_headers_); + updateOutputHeader(); +} + +void IQueryPlanStep::updateInputHeader(Header input_header, size_t idx) +{ + if (idx >= input_headers.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot update input header {} for step {} because it has only {} headers", + idx, getName(), input_headers.size()); + + input_headers[idx] = input_header; + updateOutputHeader(); +} + const Header & IQueryPlanStep::getOutputHeader() const { if (!hasOutputHeader()) diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index c3eeb8ebf48..5e345399f61 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -82,27 +82,12 @@ public: /// Updates the input streams of the given step. Used during query plan optimizations. /// It won't do any validation of new streams, so it is your responsibility to ensure that this update doesn't break anything - /// (e.g. you update data stream traits or correctly remove / add columns). - void updateInputHeaders(Headers input_headers_) - { - chassert(canUpdateInputHeader()); - input_headers = std::move(input_headers_); - updateOutputHeader(); - } - - void updateInputHeader(Header input_header) { updateInputHeaders(Headers{input_header}); } - - void updateInputHeader(Header input_header, size_t idx) - { - chassert(canUpdateInputHeader() && idx < input_headers.size()); - input_headers[idx] = input_header; - updateOutputHeader(); - } - - virtual bool canUpdateInputHeader() const { return false; } + /// (e.g. you correctly remove / add columns). + void updateInputHeaders(Headers input_headers_); + void updateInputHeader(Header input_header, size_t idx = 0); protected: - virtual void updateOutputHeader() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } + virtual void updateOutputHeader() = 0; Headers input_headers; std::optional
output_header; diff --git a/src/Processors/QueryPlan/ISourceStep.h b/src/Processors/QueryPlan/ISourceStep.h index 142d97fecab..d1aa900bdbe 100644 --- a/src/Processors/QueryPlan/ISourceStep.h +++ b/src/Processors/QueryPlan/ISourceStep.h @@ -15,6 +15,9 @@ public: virtual void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) = 0; void describePipeline(FormatSettings & settings) const override; + +protected: + void updateOutputHeader() override {} }; } diff --git a/src/Processors/QueryPlan/ITransformingStep.h b/src/Processors/QueryPlan/ITransformingStep.h index f27fc189dcd..5c7a03ad575 100644 --- a/src/Processors/QueryPlan/ITransformingStep.h +++ b/src/Processors/QueryPlan/ITransformingStep.h @@ -66,8 +66,6 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } - bool canUpdateInputHeader() const override { return true; } - protected: TransformTraits transform_traits; diff --git a/src/Processors/QueryPlan/IntersectOrExceptStep.cpp b/src/Processors/QueryPlan/IntersectOrExceptStep.cpp index 48bf5dfa192..d38f1535d86 100644 --- a/src/Processors/QueryPlan/IntersectOrExceptStep.cpp +++ b/src/Processors/QueryPlan/IntersectOrExceptStep.cpp @@ -34,7 +34,13 @@ IntersectOrExceptStep::IntersectOrExceptStep( : current_operator(operator_) , max_threads(max_threads_) { - input_headers = std::move(input_headers_); + updateInputHeaders(std::move(input_headers_)); + // input_headers = std::move(input_headers_); + // output_header = checkHeaders(input_headers); +} + +void IntersectOrExceptStep::updateOutputHeader() +{ output_header = checkHeaders(input_headers); } diff --git a/src/Processors/QueryPlan/IntersectOrExceptStep.h b/src/Processors/QueryPlan/IntersectOrExceptStep.h index a1e85e847da..cc1d6059e04 100644 --- a/src/Processors/QueryPlan/IntersectOrExceptStep.h +++ b/src/Processors/QueryPlan/IntersectOrExceptStep.h @@ -21,6 +21,8 @@ public: void describePipeline(FormatSettings & settings) const override; private: + void updateOutputHeader() override; + Operator current_operator; size_t max_threads; }; diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h index 6ede6771b08..2793784d633 100644 --- a/src/Processors/QueryPlan/JoinStep.h +++ b/src/Processors/QueryPlan/JoinStep.h @@ -34,8 +34,6 @@ public: void setJoin(JoinPtr join_) { join = std::move(join_); } bool allowPushDownToRight() const; - bool canUpdateInputHeader() const override { return true; } - private: void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp index 524baae2859..ba5ff89a653 100644 --- a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp @@ -152,20 +152,10 @@ addNewFilterStepOrThrow(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, node.step = std::make_unique( node.children.at(0)->step->getOutputHeader(), std::move(split_filter), std::move(split_filter_column_name), can_remove_filter); - if (auto * transforming_step = dynamic_cast(child.get())) - { - transforming_step->updateInputHeader(node.step->getOutputHeader()); - } + if (auto * join = typeid_cast(child.get())) + join->updateInputHeader(node.step->getOutputHeader(), child_idx); else - { - if (auto * join = typeid_cast(child.get())) - { - join->updateInputHeader(node.step->getOutputHeader(), child_idx); - } - else - throw Exception( - ErrorCodes::LOGICAL_ERROR, "We are trying to push down a filter through a step for which we cannot update input stream"); - } + child->updateInputHeader(node.step->getOutputHeader()); if (update_parent_filter) { diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 5c294093151..fd523b184e4 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -457,39 +457,6 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio } } -static void updateDataStreams(QueryPlan::Node & root) -{ - class UpdateDataStreams : public QueryPlanVisitor - { - public: - explicit UpdateDataStreams(QueryPlan::Node * root_) : QueryPlanVisitor(root_) { } - - static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; } - - static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/) - { - auto & current_step = *current_node->step; - if (!current_step.canUpdateInputHeader() || current_node->children.empty()) - return; - - for (const auto * child : current_node->children) - { - if (!child->step->hasOutputHeader()) - return; - } - - Headers headers; - headers.reserve(current_node->children.size()); - for (const auto * child : current_node->children) - headers.emplace_back(child->step->getOutputHeader()); - - current_step.updateInputHeaders(std::move(headers)); - } - }; - - UpdateDataStreams(&root).visit(); -} - void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings) { /// optimization need to be applied before "mergeExpressions" optimization @@ -502,8 +469,6 @@ void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_sett QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes); if (optimization_settings.build_sets) QueryPlanOptimizations::addStepsToBuildSets(*this, *root, nodes); - - updateDataStreams(*root); } void QueryPlan::explainEstimate(MutableColumns & columns) const diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index b7a87b27be5..62d152c0fee 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -34,8 +34,8 @@ UnionStep::UnionStep(Headers input_headers_, size_t max_threads_) void UnionStep::updateOutputHeader() { - if (input_headers.size() == 1 || !output_header) - output_header = checkHeaders(input_headers); + //if (input_headers.size() == 1 || !output_header) + output_header = checkHeaders(input_headers); } QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index a98d2ef06f3..efb8f51c7a4 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -19,8 +19,6 @@ public: size_t getMaxThreads() const { return max_threads; } - bool canUpdateInputHeader() const override { return true; } - private: void updateOutputHeader() override;