Trying to fix short-circuit for FilterStep.

This commit is contained in:
Nikolai Kochetov 2024-11-07 18:19:26 +00:00
parent 4ad8273e5f
commit 0ac6ce56bd

View File

@ -5,6 +5,9 @@
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Functions/IFunction.h>
#include <stack>
#include <ranges>
namespace DB
{
@ -24,6 +27,78 @@ static ITransformingStep::Traits getTraits()
};
}
static bool isTrivialSubtree(const ActionsDAG::Node * node)
{
while (node->type == ActionsDAG::ActionType::ALIAS)
node = node->children.at(0);
return node->type != ActionsDAG::ActionType::FUNCTION && node->type != ActionsDAG::ActionType::ARRAY_JOIN;
}
struct ActionsAndName
{
ActionsDAG dag;
std::string name;
};
static ActionsAndName splitSingleAndFilter(ActionsDAG & dag, const ActionsDAG::Node * filter_node)
{
auto name = filter_node->result_name;
auto split_result = dag.split({filter_node}, true);
dag = std::move(split_result.second);
split_result.first.getOutputs().emplace(split_result.first.getOutputs().begin(), split_result.split_nodes_mapping[filter_node]);
return ActionsAndName{std::move(split_result.first), std::move(name)};
}
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
{
const auto * filter = &dag.findInOutputs(filter_name);
while (filter->type == ActionsDAG::ActionType::ALIAS)
filter = filter->children.at(0);
if (filter->type != ActionsDAG::ActionType::FUNCTION || filter->function_base->getName() != "and")
return {};
const ActionsDAG::Node * condition_to_split = nullptr;
std::stack<const ActionsDAG::Node *> nodes;
nodes.push(filter);
while (!nodes.empty())
{
const auto * node = nodes.top();
nodes.pop();
if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
{
for (const auto * child : node->children | std::ranges::views::reverse)
nodes.push(child);
continue;
}
if (isTrivialSubtree(node))
continue;
/// Do not split subtree if it's the last non-trivial one.
/// So, split the first found condition only when there is a another one found.
if (condition_to_split)
return splitSingleAndFilter(dag, condition_to_split);
condition_to_split = node;
}
return {};
}
std::vector<ActionsAndName> splitAndChainIntoMultipleFilters(ActionsDAG & dag, const std::string & filter_name)
{
std::vector<ActionsAndName> res;
while (auto condition = trySplitSingleAndFilter(dag, filter_name))
res.push_back(std::move(*condition));
return res;
}
FilterStep::FilterStep(
const Header & input_header_,
ActionsDAG actions_dag_,
@ -50,6 +125,17 @@ FilterStep::FilterStep(
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
auto and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, and_atom.name, true, on_totals);
});
}
auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
@ -76,13 +162,23 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
void FilterStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, settings.indent_char);
auto cloned_dag = actions_dag.clone();
auto and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
settings.out << prefix << "AND column: " << and_atom.name;
expression->describeActions(settings.out, prefix);
}
settings.out << prefix << "Filter column: " << filter_column_name;
if (remove_filter_column)
settings.out << " (removed)";
settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(actions_dag.clone());
auto expression = std::make_shared<ExpressionActions>(std::move(cloned_dag));
expression->describeActions(settings.out, prefix);
}