From 3ad0683de93691bc8c9745f622a373b8f958db3a Mon Sep 17 00:00:00 2001 From: Alexander Gololobov <440544+davenger@users.noreply.github.com> Date: Sun, 12 Feb 2023 21:42:24 +0100 Subject: [PATCH] Refactor DAG splitting into steps --- .../MergeTreeBaseSelectProcessor.cpp | 269 +++++++++++++++++- 1 file changed, 268 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index df7f77261af..5f40950c3b2 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -17,6 +17,7 @@ /// For CAST to bool #include +#include #include #include @@ -108,6 +109,255 @@ static const ActionsDAG::Node & addCast(ActionsDAGPtr dag, const ActionsDAG::Nod return dag->addFunction(func_builder_cast, std::move(children), new_name); }; +struct NodeInfo +{ + NameSet required_columns; +}; + +void fillRequiredColumns(const ActionsDAG::Node * node, std::unordered_map & nodes_info) +{ + if (nodes_info.contains(node)) + return; + + auto & node_info = nodes_info[node]; + + if (node->type == ActionsDAG::ActionType::INPUT) + { + node_info.required_columns.insert(node->result_name); + return; + } + + for (const auto & child : node->children) + { + fillRequiredColumns(child, nodes_info); + const auto & child_info = nodes_info[child]; + node_info.required_columns.insert(child_info.required_columns.begin(), child_info.required_columns.end()); + } +} + +struct DAGNodeRef +{ + ActionsDAGPtr dag; + const ActionsDAG::Node * node; +}; + +using OriginalToNewNodeMap = std::unordered_map; + +const ActionsDAG::Node & addClonedDAGToDAG(const ActionsDAG::Node * original_dag_node, ActionsDAGPtr new_dag, OriginalToNewNodeMap & node_remap) +{ + if (node_remap.contains(original_dag_node)) + { + /// If the node is already in the new DAG, return it + const auto & node_ref = node_remap.at(original_dag_node); + if (node_ref.dag == new_dag) + return *node_ref.node; + + /// If the node is known from the previous steps, add it as an input + node_ref.dag->addOrReplaceInOutputs(*node_ref.node); + const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type); + node_remap[original_dag_node] = {new_dag, &new_node}; /// TODO: here we update the node reference. Is ti always correct? + return new_node; + } + + /// If the node is an input, add it as an input + if (original_dag_node->type == ActionsDAG::ActionType::INPUT) + { + const auto & new_node = new_dag->addInput(original_dag_node->result_name, original_dag_node->result_type); + node_remap[original_dag_node] = {new_dag, &new_node}; + return new_node; + } + + /// If the node is a column, add it as an input + if (original_dag_node->type == ActionsDAG::ActionType::COLUMN) + { + const auto & new_node = new_dag->addColumn( + ColumnWithTypeAndName(original_dag_node->column, original_dag_node->result_type, original_dag_node->result_name)); + node_remap[original_dag_node] = {new_dag, &new_node}; + return new_node; + } + + /// TODO: Alias node? + + /// If the node is a function, add it as a function and add its children + if (original_dag_node->type == ActionsDAG::ActionType::FUNCTION) + { + ActionsDAG::NodeRawConstPtrs new_children; + for (const auto & child : original_dag_node->children) + { + const auto & new_child = addClonedDAGToDAG(child, new_dag, node_remap); + new_children.push_back(&new_child); + } + + const auto & new_node = new_dag->addFunction(original_dag_node->function_base, new_children, original_dag_node->result_name); + node_remap[original_dag_node] = {new_dag, &new_node}; + return new_node; + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected node type in PREWHERE actions: {}", original_dag_node->type); +} + +bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere) +{ + /// We want to build a sequence of steps that will compute parts of the prewhere condition. + /// Each step reads some new columns and computes some new expressions and a filter condition. + /// The last step computes the final filter condition and the remaining expressions that are required for the main query. + /// The steps are built in the following way: + /// 1. List all condition nodes that are combined with AND into PREWHERE condition + /// 2. Collect the set of columns that are used in the condition + /// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns + /// 4. Group conditions with the same set of columns into a single read/compute step + /// 5. Build DAGs for each step: + /// - DFS from the condition root node: + /// - If the node was not computed yet, add it to the DAG and traverse its children + /// - If the node was already computed by one of the previous steps, add it as output for that step and as input for the current step + /// - If the node was already computed by the current step just stop traversing + /// 6. Find all outputs of the original DAG + /// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed + /// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4 + + if (!prewhere_info || !prewhere_info->prewhere_actions) + return true; + + /// 1. List all condition nodes that are combined with AND into PREWHERE condition + const auto & condition_root = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name); + const bool is_conjunction = (condition_root.type == ActionsDAG::ActionType::FUNCTION && condition_root.function_base->getName() == "and"); + if (!is_conjunction) + return false; + auto condition_nodes = condition_root.children; + + /// 2. Collect the set of columns that are used in the condition + std::unordered_map nodes_info; + for (const auto & node : condition_nodes) + { + fillRequiredColumns(node, nodes_info); + } + + /// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns + /// TODO: not sorting for now because the conditions are already sorted by Where Optimizer + + /// 4. Group conditions with the same set of columns into a single read/compute step + std::vector> condition_groups; + for (const auto & node : condition_nodes) + { + const auto & node_info = nodes_info[node]; + if (!condition_groups.empty() && nodes_info[condition_groups.back().back()].required_columns == node_info.required_columns) + condition_groups.back().push_back(node); /// Add to the last group + else + condition_groups.push_back({node}); /// Start new group + } + + /// 5. Build DAGs for each step + struct Step + { + ActionsDAGPtr actions; + String column_name; + }; + std::vector steps; + + OriginalToNewNodeMap node_remap; + + for (const auto & condition_group : condition_groups) + { +// std::cerr +// << "Conditions: ["; +// +// for (const auto & condition : condition_group) +// std::cerr << " \"" << condition->result_name; +// +// std::cerr << "\" ] Columns: " << boost::algorithm::join(nodes_info[condition_group.front()].required_columns, " ") +// << std::endl; + + ActionsDAGPtr step_dag = std::make_shared(); + String result_name; + + std::vector new_condition_nodes; + for (const auto * node : condition_group) + { + const auto & node_in_new_dag = addClonedDAGToDAG(node, step_dag, node_remap); + new_condition_nodes.push_back(&node_in_new_dag); + } + + if (new_condition_nodes.size() > 1) + { + /// Add AND function to combine the conditions + FunctionOverloadResolverPtr func_builder_and = std::make_unique(std::make_shared()); + const auto & and_function_node = step_dag->addFunction(func_builder_and, new_condition_nodes, ""); + step_dag->addOrReplaceInOutputs(and_function_node); + result_name = and_function_node.result_name; + } + else + { + const auto & result_node = *new_condition_nodes.front(); + /// Add cast to UInt8 if needed + if (result_node.result_type->getTypeId() == TypeIndex::UInt8) + { + step_dag->addOrReplaceInOutputs(result_node); + result_name = result_node.result_name; + } + else + { + const auto & cast_node = addCast(step_dag, result_node, "UInt8"); + step_dag->addOrReplaceInOutputs(cast_node); + result_name = cast_node.result_name; + } + } + +// std::cerr << "Step DAG:\n" << step_dag->dumpDAG() << std::endl; + + steps.push_back({step_dag, result_name}); + } + + /// 6. Find all outputs of the original DAG + auto original_outputs = prewhere_info->prewhere_actions->getOutputs(); + /// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed + /// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4 + for (const auto * output : original_outputs) + { +// std::cerr << "Original output: " << output->result_name << std::endl; + if (node_remap.contains(output)) + { + const auto & new_node_info = node_remap[output]; + new_node_info.dag->addOrReplaceInOutputs(*new_node_info.node); + } + else if (output->result_name == prewhere_info->prewhere_column_name) + { + /// Special case for final PREWHERE column: + /// "Rename" the last step result to the combined PREWHERE column name, because in fact it will be AND of all step results + const auto & prewhere_result_node = addCast( + steps.back().actions, + steps.back().actions->findInOutputs(steps.back().column_name), + output->result_type->getName(), + prewhere_info->prewhere_column_name); + + steps.back().actions->addOrReplaceInOutputs(prewhere_result_node); + } + else + { + const auto & node_in_new_dag = addClonedDAGToDAG(output, steps.back().actions, node_remap); + steps.back().actions->addOrReplaceInOutputs(node_in_new_dag); + } + } + + /// 9. Build PrewhereExprInfo + { + for (const auto & step : steps) + { +// std::cerr << "Step DAG:\n" << step.actions->dumpDAG() << std::endl; + prewhere.steps.push_back( + { + .actions = std::make_shared(step.actions, actions_settings), + .column_name = step.column_name, + .remove_column = true, + .need_filter = false, + }); + } + prewhere.steps.back().remove_column = prewhere_info->remove_prewhere_column; + prewhere.steps.back().need_filter = prewhere_info->need_filter; + } + + return true; +} + std::unique_ptr IMergeTreeSelectAlgorithm::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps) { std::unique_ptr prewhere_actions; @@ -128,7 +378,23 @@ std::unique_ptr IMergeTreeSelectAlgorithm::getPrewhereActions( prewhere_actions->steps.emplace_back(std::move(row_level_filter_step)); } - //std::cerr << "ORIGINAL PREWHERE:\n" << prewhere_info->prewhere_actions->dumpDAG() << std::endl; +// std::cerr << "ORIGINAL PREWHERE:\n" << prewhere_info->prewhere_actions->dumpDAG() << std::endl; + +#if 1 + if (!enable_multiple_prewhere_read_steps || !tryBuildPrewhereSteps(prewhere_info, actions_settings, *prewhere_actions)) + { + PrewhereExprStep prewhere_step + { + .actions = std::make_shared(prewhere_info->prewhere_actions, actions_settings), + .column_name = prewhere_info->prewhere_column_name, + .remove_column = prewhere_info->remove_prewhere_column, + .need_filter = prewhere_info->need_filter + }; + + prewhere_actions->steps.emplace_back(std::move(prewhere_step)); + } + +#else struct Step { @@ -243,6 +509,7 @@ std::unique_ptr IMergeTreeSelectAlgorithm::getPrewhereActions( prewhere_actions->steps.emplace_back(std::move(prewhere_step)); } +#endif } return prewhere_actions;