Allow update plan headers for all the steps.

This commit is contained in:
Nikolai Kochetov 2024-10-16 16:53:57 +00:00
parent 1dfc68aefc
commit 068d635b81
14 changed files with 49 additions and 77 deletions

View File

@ -589,8 +589,11 @@ AggregatingProjectionStep::AggregatingProjectionStep(
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
{
input_headers = std::move(input_headers_);
updateInputHeaders(std::move(input_headers_));
}
void AggregatingProjectionStep::updateOutputHeader()
{
if (input_headers.size() != 2)
throw Exception(
ErrorCodes::LOGICAL_ERROR,

View File

@ -123,6 +123,8 @@ public:
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override;
private:
void updateOutputHeader() override;
Aggregator::Params params;
bool final;
size_t merge_threads;

View File

@ -45,6 +45,9 @@ public:
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;
private:
void updateOutputHeader() override { output_header = getInputHeaders().front(); }
};
/// This is a temporary step which is converted to CreatingSetStep after plan optimization.
@ -64,6 +67,8 @@ public:
PreparedSets::Subqueries detachSets() { return std::move(subqueries); }
private:
void updateOutputHeader() override { output_header = getInputHeaders().front(); }
PreparedSets::Subqueries subqueries;
ContextPtr context;
};

View File

@ -10,6 +10,23 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void IQueryPlanStep::updateInputHeaders(Headers input_headers_)
{
input_headers = std::move(input_headers_);
updateOutputHeader();
}
void IQueryPlanStep::updateInputHeader(Header input_header, size_t idx)
{
if (idx >= input_headers.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot update input header {} for step {} because it has only {} headers",
idx, getName(), input_headers.size());
input_headers[idx] = input_header;
updateOutputHeader();
}
const Header & IQueryPlanStep::getOutputHeader() const
{
if (!hasOutputHeader())

View File

@ -82,27 +82,12 @@ public:
/// Updates the input streams of the given step. Used during query plan optimizations.
/// It won't do any validation of new streams, so it is your responsibility to ensure that this update doesn't break anything
/// (e.g. you update data stream traits or correctly remove / add columns).
void updateInputHeaders(Headers input_headers_)
{
chassert(canUpdateInputHeader());
input_headers = std::move(input_headers_);
updateOutputHeader();
}
void updateInputHeader(Header input_header) { updateInputHeaders(Headers{input_header}); }
void updateInputHeader(Header input_header, size_t idx)
{
chassert(canUpdateInputHeader() && idx < input_headers.size());
input_headers[idx] = input_header;
updateOutputHeader();
}
virtual bool canUpdateInputHeader() const { return false; }
/// (e.g. you correctly remove / add columns).
void updateInputHeaders(Headers input_headers_);
void updateInputHeader(Header input_header, size_t idx = 0);
protected:
virtual void updateOutputHeader() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); }
virtual void updateOutputHeader() = 0;
Headers input_headers;
std::optional<Header> output_header;

View File

@ -15,6 +15,9 @@ public:
virtual void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) = 0;
void describePipeline(FormatSettings & settings) const override;
protected:
void updateOutputHeader() override {}
};
}

View File

@ -66,8 +66,6 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
bool canUpdateInputHeader() const override { return true; }
protected:
TransformTraits transform_traits;

View File

@ -34,7 +34,13 @@ IntersectOrExceptStep::IntersectOrExceptStep(
: current_operator(operator_)
, max_threads(max_threads_)
{
input_headers = std::move(input_headers_);
updateInputHeaders(std::move(input_headers_));
// input_headers = std::move(input_headers_);
// output_header = checkHeaders(input_headers);
}
void IntersectOrExceptStep::updateOutputHeader()
{
output_header = checkHeaders(input_headers);
}

View File

@ -21,6 +21,8 @@ public:
void describePipeline(FormatSettings & settings) const override;
private:
void updateOutputHeader() override;
Operator current_operator;
size_t max_threads;
};

View File

@ -34,8 +34,6 @@ public:
void setJoin(JoinPtr join_) { join = std::move(join_); }
bool allowPushDownToRight() const;
bool canUpdateInputHeader() const override { return true; }
private:
void updateOutputHeader() override;

View File

@ -152,20 +152,10 @@ addNewFilterStepOrThrow(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes,
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputHeader(), std::move(split_filter), std::move(split_filter_column_name), can_remove_filter);
if (auto * transforming_step = dynamic_cast<ITransformingStep *>(child.get()))
{
transforming_step->updateInputHeader(node.step->getOutputHeader());
}
if (auto * join = typeid_cast<JoinStep *>(child.get()))
join->updateInputHeader(node.step->getOutputHeader(), child_idx);
else
{
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
join->updateInputHeader(node.step->getOutputHeader(), child_idx);
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "We are trying to push down a filter through a step for which we cannot update input stream");
}
child->updateInputHeader(node.step->getOutputHeader());
if (update_parent_filter)
{

View File

@ -457,39 +457,6 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
}
}
static void updateDataStreams(QueryPlan::Node & root)
{
class UpdateDataStreams : public QueryPlanVisitor<UpdateDataStreams, false>
{
public:
explicit UpdateDataStreams(QueryPlan::Node * root_) : QueryPlanVisitor<UpdateDataStreams, false>(root_) { }
static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; }
static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/)
{
auto & current_step = *current_node->step;
if (!current_step.canUpdateInputHeader() || current_node->children.empty())
return;
for (const auto * child : current_node->children)
{
if (!child->step->hasOutputHeader())
return;
}
Headers headers;
headers.reserve(current_node->children.size());
for (const auto * child : current_node->children)
headers.emplace_back(child->step->getOutputHeader());
current_step.updateInputHeaders(std::move(headers));
}
};
UpdateDataStreams(&root).visit();
}
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
{
/// optimization need to be applied before "mergeExpressions" optimization
@ -502,8 +469,6 @@ void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_sett
QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);
if (optimization_settings.build_sets)
QueryPlanOptimizations::addStepsToBuildSets(*this, *root, nodes);
updateDataStreams(*root);
}
void QueryPlan::explainEstimate(MutableColumns & columns) const

View File

@ -34,8 +34,8 @@ UnionStep::UnionStep(Headers input_headers_, size_t max_threads_)
void UnionStep::updateOutputHeader()
{
if (input_headers.size() == 1 || !output_header)
output_header = checkHeaders(input_headers);
//if (input_headers.size() == 1 || !output_header)
output_header = checkHeaders(input_headers);
}
QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)

View File

@ -19,8 +19,6 @@ public:
size_t getMaxThreads() const { return max_threads; }
bool canUpdateInputHeader() const override { return true; }
private:
void updateOutputHeader() override;