mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 01:41:59 +00:00
Refactor DAG splitting into steps
This commit is contained in:
parent
943f2ea212
commit
3ad0683de9
@ -17,6 +17,7 @@
|
||||
|
||||
/// For CAST to bool
|
||||
#include <Functions/CastOverloadResolver.h>
|
||||
#include <Functions/FunctionsLogical.h>
|
||||
#include <Planner/PlannerActionsVisitor.h>
|
||||
|
||||
#include <city.h>
|
||||
@ -108,6 +109,255 @@ static const ActionsDAG::Node & addCast(ActionsDAGPtr dag, const ActionsDAG::Nod
|
||||
return dag->addFunction(func_builder_cast, std::move(children), new_name);
|
||||
};
|
||||
|
||||
struct NodeInfo
|
||||
{
|
||||
NameSet required_columns;
|
||||
};
|
||||
|
||||
void fillRequiredColumns(const ActionsDAG::Node * node, std::unordered_map<const ActionsDAG::Node *, NodeInfo> & nodes_info)
|
||||
{
|
||||
if (nodes_info.contains(node))
|
||||
return;
|
||||
|
||||
auto & node_info = nodes_info[node];
|
||||
|
||||
if (node->type == ActionsDAG::ActionType::INPUT)
|
||||
{
|
||||
node_info.required_columns.insert(node->result_name);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto & child : node->children)
|
||||
{
|
||||
fillRequiredColumns(child, nodes_info);
|
||||
const auto & child_info = nodes_info[child];
|
||||
node_info.required_columns.insert(child_info.required_columns.begin(), child_info.required_columns.end());
|
||||
}
|
||||
}
|
||||
|
||||
struct DAGNodeRef
|
||||
{
|
||||
ActionsDAGPtr dag;
|
||||
const ActionsDAG::Node * node;
|
||||
};
|
||||
|
||||
using OriginalToNewNodeMap = std::unordered_map<const ActionsDAG::Node *, DAGNodeRef>;
|
||||
|
||||
const ActionsDAG::Node & addClonedDAGToDAG(const ActionsDAG::Node * original_dag_node, ActionsDAGPtr new_dag, OriginalToNewNodeMap & node_remap)
|
||||
{
|
||||
if (node_remap.contains(original_dag_node))
|
||||
{
|
||||
/// If the node is already in the new DAG, return it
|
||||
const auto & node_ref = node_remap.at(original_dag_node);
|
||||
if (node_ref.dag == new_dag)
|
||||
return *node_ref.node;
|
||||
|
||||
/// If the node is known from the previous steps, add it as an input
|
||||
node_ref.dag->addOrReplaceInOutputs(*node_ref.node);
|
||||
const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type);
|
||||
node_remap[original_dag_node] = {new_dag, &new_node}; /// TODO: here we update the node reference. Is ti always correct?
|
||||
return new_node;
|
||||
}
|
||||
|
||||
/// If the node is an input, add it as an input
|
||||
if (original_dag_node->type == ActionsDAG::ActionType::INPUT)
|
||||
{
|
||||
const auto & new_node = new_dag->addInput(original_dag_node->result_name, original_dag_node->result_type);
|
||||
node_remap[original_dag_node] = {new_dag, &new_node};
|
||||
return new_node;
|
||||
}
|
||||
|
||||
/// If the node is a column, add it as an input
|
||||
if (original_dag_node->type == ActionsDAG::ActionType::COLUMN)
|
||||
{
|
||||
const auto & new_node = new_dag->addColumn(
|
||||
ColumnWithTypeAndName(original_dag_node->column, original_dag_node->result_type, original_dag_node->result_name));
|
||||
node_remap[original_dag_node] = {new_dag, &new_node};
|
||||
return new_node;
|
||||
}
|
||||
|
||||
/// TODO: Alias node?
|
||||
|
||||
/// If the node is a function, add it as a function and add its children
|
||||
if (original_dag_node->type == ActionsDAG::ActionType::FUNCTION)
|
||||
{
|
||||
ActionsDAG::NodeRawConstPtrs new_children;
|
||||
for (const auto & child : original_dag_node->children)
|
||||
{
|
||||
const auto & new_child = addClonedDAGToDAG(child, new_dag, node_remap);
|
||||
new_children.push_back(&new_child);
|
||||
}
|
||||
|
||||
const auto & new_node = new_dag->addFunction(original_dag_node->function_base, new_children, original_dag_node->result_name);
|
||||
node_remap[original_dag_node] = {new_dag, &new_node};
|
||||
return new_node;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected node type in PREWHERE actions: {}", original_dag_node->type);
|
||||
}
|
||||
|
||||
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere)
|
||||
{
|
||||
/// We want to build a sequence of steps that will compute parts of the prewhere condition.
|
||||
/// Each step reads some new columns and computes some new expressions and a filter condition.
|
||||
/// The last step computes the final filter condition and the remaining expressions that are required for the main query.
|
||||
/// The steps are built in the following way:
|
||||
/// 1. List all condition nodes that are combined with AND into PREWHERE condition
|
||||
/// 2. Collect the set of columns that are used in the condition
|
||||
/// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns
|
||||
/// 4. Group conditions with the same set of columns into a single read/compute step
|
||||
/// 5. Build DAGs for each step:
|
||||
/// - DFS from the condition root node:
|
||||
/// - If the node was not computed yet, add it to the DAG and traverse its children
|
||||
/// - If the node was already computed by one of the previous steps, add it as output for that step and as input for the current step
|
||||
/// - If the node was already computed by the current step just stop traversing
|
||||
/// 6. Find all outputs of the original DAG
|
||||
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
||||
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
||||
|
||||
if (!prewhere_info || !prewhere_info->prewhere_actions)
|
||||
return true;
|
||||
|
||||
/// 1. List all condition nodes that are combined with AND into PREWHERE condition
|
||||
const auto & condition_root = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
|
||||
const bool is_conjunction = (condition_root.type == ActionsDAG::ActionType::FUNCTION && condition_root.function_base->getName() == "and");
|
||||
if (!is_conjunction)
|
||||
return false;
|
||||
auto condition_nodes = condition_root.children;
|
||||
|
||||
/// 2. Collect the set of columns that are used in the condition
|
||||
std::unordered_map<const ActionsDAG::Node *, NodeInfo> nodes_info;
|
||||
for (const auto & node : condition_nodes)
|
||||
{
|
||||
fillRequiredColumns(node, nodes_info);
|
||||
}
|
||||
|
||||
/// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns
|
||||
/// TODO: not sorting for now because the conditions are already sorted by Where Optimizer
|
||||
|
||||
/// 4. Group conditions with the same set of columns into a single read/compute step
|
||||
std::vector<std::vector<const ActionsDAG::Node *>> condition_groups;
|
||||
for (const auto & node : condition_nodes)
|
||||
{
|
||||
const auto & node_info = nodes_info[node];
|
||||
if (!condition_groups.empty() && nodes_info[condition_groups.back().back()].required_columns == node_info.required_columns)
|
||||
condition_groups.back().push_back(node); /// Add to the last group
|
||||
else
|
||||
condition_groups.push_back({node}); /// Start new group
|
||||
}
|
||||
|
||||
/// 5. Build DAGs for each step
|
||||
struct Step
|
||||
{
|
||||
ActionsDAGPtr actions;
|
||||
String column_name;
|
||||
};
|
||||
std::vector<Step> steps;
|
||||
|
||||
OriginalToNewNodeMap node_remap;
|
||||
|
||||
for (const auto & condition_group : condition_groups)
|
||||
{
|
||||
// std::cerr
|
||||
// << "Conditions: [";
|
||||
//
|
||||
// for (const auto & condition : condition_group)
|
||||
// std::cerr << " \"" << condition->result_name;
|
||||
//
|
||||
// std::cerr << "\" ] Columns: " << boost::algorithm::join(nodes_info[condition_group.front()].required_columns, " ")
|
||||
// << std::endl;
|
||||
|
||||
ActionsDAGPtr step_dag = std::make_shared<ActionsDAG>();
|
||||
String result_name;
|
||||
|
||||
std::vector<const ActionsDAG::Node *> new_condition_nodes;
|
||||
for (const auto * node : condition_group)
|
||||
{
|
||||
const auto & node_in_new_dag = addClonedDAGToDAG(node, step_dag, node_remap);
|
||||
new_condition_nodes.push_back(&node_in_new_dag);
|
||||
}
|
||||
|
||||
if (new_condition_nodes.size() > 1)
|
||||
{
|
||||
/// Add AND function to combine the conditions
|
||||
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
|
||||
const auto & and_function_node = step_dag->addFunction(func_builder_and, new_condition_nodes, "");
|
||||
step_dag->addOrReplaceInOutputs(and_function_node);
|
||||
result_name = and_function_node.result_name;
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & result_node = *new_condition_nodes.front();
|
||||
/// Add cast to UInt8 if needed
|
||||
if (result_node.result_type->getTypeId() == TypeIndex::UInt8)
|
||||
{
|
||||
step_dag->addOrReplaceInOutputs(result_node);
|
||||
result_name = result_node.result_name;
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & cast_node = addCast(step_dag, result_node, "UInt8");
|
||||
step_dag->addOrReplaceInOutputs(cast_node);
|
||||
result_name = cast_node.result_name;
|
||||
}
|
||||
}
|
||||
|
||||
// std::cerr << "Step DAG:\n" << step_dag->dumpDAG() << std::endl;
|
||||
|
||||
steps.push_back({step_dag, result_name});
|
||||
}
|
||||
|
||||
/// 6. Find all outputs of the original DAG
|
||||
auto original_outputs = prewhere_info->prewhere_actions->getOutputs();
|
||||
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
||||
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
||||
for (const auto * output : original_outputs)
|
||||
{
|
||||
// std::cerr << "Original output: " << output->result_name << std::endl;
|
||||
if (node_remap.contains(output))
|
||||
{
|
||||
const auto & new_node_info = node_remap[output];
|
||||
new_node_info.dag->addOrReplaceInOutputs(*new_node_info.node);
|
||||
}
|
||||
else if (output->result_name == prewhere_info->prewhere_column_name)
|
||||
{
|
||||
/// Special case for final PREWHERE column:
|
||||
/// "Rename" the last step result to the combined PREWHERE column name, because in fact it will be AND of all step results
|
||||
const auto & prewhere_result_node = addCast(
|
||||
steps.back().actions,
|
||||
steps.back().actions->findInOutputs(steps.back().column_name),
|
||||
output->result_type->getName(),
|
||||
prewhere_info->prewhere_column_name);
|
||||
|
||||
steps.back().actions->addOrReplaceInOutputs(prewhere_result_node);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & node_in_new_dag = addClonedDAGToDAG(output, steps.back().actions, node_remap);
|
||||
steps.back().actions->addOrReplaceInOutputs(node_in_new_dag);
|
||||
}
|
||||
}
|
||||
|
||||
/// 9. Build PrewhereExprInfo
|
||||
{
|
||||
for (const auto & step : steps)
|
||||
{
|
||||
// std::cerr << "Step DAG:\n" << step.actions->dumpDAG() << std::endl;
|
||||
prewhere.steps.push_back(
|
||||
{
|
||||
.actions = std::make_shared<ExpressionActions>(step.actions, actions_settings),
|
||||
.column_name = step.column_name,
|
||||
.remove_column = true,
|
||||
.need_filter = false,
|
||||
});
|
||||
}
|
||||
prewhere.steps.back().remove_column = prewhere_info->remove_prewhere_column;
|
||||
prewhere.steps.back().need_filter = prewhere_info->need_filter;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps)
|
||||
{
|
||||
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
|
||||
@ -130,6 +380,22 @@ std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(
|
||||
|
||||
// std::cerr << "ORIGINAL PREWHERE:\n" << prewhere_info->prewhere_actions->dumpDAG() << std::endl;
|
||||
|
||||
#if 1
|
||||
if (!enable_multiple_prewhere_read_steps || !tryBuildPrewhereSteps(prewhere_info, actions_settings, *prewhere_actions))
|
||||
{
|
||||
PrewhereExprStep prewhere_step
|
||||
{
|
||||
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings),
|
||||
.column_name = prewhere_info->prewhere_column_name,
|
||||
.remove_column = prewhere_info->remove_prewhere_column,
|
||||
.need_filter = prewhere_info->need_filter
|
||||
};
|
||||
|
||||
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
struct Step
|
||||
{
|
||||
ActionsDAGPtr actions;
|
||||
@ -243,6 +509,7 @@ std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(
|
||||
|
||||
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
return prewhere_actions;
|
||||
|
Loading…
Reference in New Issue
Block a user