2023-02-13 14:28:54 +00:00
|
|
|
#include <Functions/CastOverloadResolver.h>
|
|
|
|
#include <Functions/FunctionsLogical.h>
|
|
|
|
#include <Storages/SelectQueryInfo.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
|
|
|
#include <Interpreters/ExpressionActions.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
2023-02-15 21:37:18 +00:00
|
|
|
/// Stores the list of columns required to compute a node in the DAG.
|
2023-02-13 14:28:54 +00:00
|
|
|
struct NodeInfo
|
|
|
|
{
|
|
|
|
NameSet required_columns;
|
|
|
|
};
|
|
|
|
|
|
|
|
/// Fills the list of required columns for a node in the DAG.
|
|
|
|
void fillRequiredColumns(const ActionsDAG::Node * node, std::unordered_map<const ActionsDAG::Node *, NodeInfo> & nodes_info)
|
|
|
|
{
|
|
|
|
if (nodes_info.contains(node))
|
|
|
|
return;
|
|
|
|
|
|
|
|
auto & node_info = nodes_info[node];
|
|
|
|
|
|
|
|
if (node->type == ActionsDAG::ActionType::INPUT)
|
|
|
|
{
|
|
|
|
node_info.required_columns.insert(node->result_name);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (const auto & child : node->children)
|
|
|
|
{
|
|
|
|
fillRequiredColumns(child, nodes_info);
|
|
|
|
const auto & child_info = nodes_info[child];
|
|
|
|
node_info.required_columns.insert(child_info.required_columns.begin(), child_info.required_columns.end());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-13 22:14:18 +00:00
|
|
|
/// Stores information about a node that has already been cloned or added to one of the new DAGs.
|
2023-02-15 21:37:04 +00:00
|
|
|
/// This allows to avoid cloning the same sub-DAG into multiple step DAGs but reference previously cloned nodes from earlier steps.
|
2023-02-13 14:28:54 +00:00
|
|
|
struct DAGNodeRef
|
|
|
|
{
|
|
|
|
ActionsDAGPtr dag;
|
|
|
|
const ActionsDAG::Node * node;
|
|
|
|
};
|
|
|
|
|
2023-02-13 22:14:18 +00:00
|
|
|
/// Result name -> DAGNodeRef
|
|
|
|
using OriginalToNewNodeMap = std::unordered_map<String, DAGNodeRef>;
|
2023-02-13 14:28:54 +00:00
|
|
|
|
|
|
|
/// Clones the part of original DAG responsible for computing the original_dag_node and adds it to the new DAG.
|
|
|
|
const ActionsDAG::Node & addClonedDAGToDAG(const ActionsDAG::Node * original_dag_node, ActionsDAGPtr new_dag, OriginalToNewNodeMap & node_remap)
|
|
|
|
{
|
2023-02-13 22:14:18 +00:00
|
|
|
const String & node_name = original_dag_node->result_name;
|
2023-02-13 14:28:54 +00:00
|
|
|
/// Look for the node in the map of already known nodes
|
2023-02-13 22:14:18 +00:00
|
|
|
if (node_remap.contains(node_name))
|
2023-02-13 14:28:54 +00:00
|
|
|
{
|
|
|
|
/// If the node is already in the new DAG, return it
|
2023-02-13 22:14:18 +00:00
|
|
|
const auto & node_ref = node_remap.at(node_name);
|
2023-02-13 14:28:54 +00:00
|
|
|
if (node_ref.dag == new_dag)
|
|
|
|
return *node_ref.node;
|
|
|
|
|
|
|
|
/// If the node is known from the previous steps, add it as an input, except for constants
|
|
|
|
if (original_dag_node->type != ActionsDAG::ActionType::COLUMN)
|
|
|
|
{
|
|
|
|
node_ref.dag->addOrReplaceInOutputs(*node_ref.node);
|
|
|
|
const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type);
|
2023-02-13 22:14:18 +00:00
|
|
|
node_remap[node_name] = {new_dag, &new_node}; /// TODO: here we update the node reference. Is it always correct?
|
2023-02-13 14:28:54 +00:00
|
|
|
return new_node;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// If the node is an input, add it as an input
|
|
|
|
if (original_dag_node->type == ActionsDAG::ActionType::INPUT)
|
|
|
|
{
|
|
|
|
const auto & new_node = new_dag->addInput(original_dag_node->result_name, original_dag_node->result_type);
|
2023-02-13 22:14:18 +00:00
|
|
|
node_remap[node_name] = {new_dag, &new_node};
|
2023-02-13 14:28:54 +00:00
|
|
|
return new_node;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// If the node is a column, add it as an input
|
|
|
|
if (original_dag_node->type == ActionsDAG::ActionType::COLUMN)
|
|
|
|
{
|
|
|
|
const auto & new_node = new_dag->addColumn(
|
|
|
|
ColumnWithTypeAndName(original_dag_node->column, original_dag_node->result_type, original_dag_node->result_name));
|
2023-02-13 22:14:18 +00:00
|
|
|
node_remap[node_name] = {new_dag, &new_node};
|
2023-02-13 14:28:54 +00:00
|
|
|
return new_node;
|
|
|
|
}
|
|
|
|
|
2023-03-20 10:29:27 +00:00
|
|
|
if (original_dag_node->type == ActionsDAG::ActionType::ALIAS)
|
|
|
|
{
|
|
|
|
const auto & alias_child = addClonedDAGToDAG(original_dag_node->children[0], new_dag, node_remap);
|
|
|
|
const auto & new_node = new_dag->addAlias(alias_child, original_dag_node->result_name);
|
|
|
|
node_remap[node_name] = {new_dag, &new_node};
|
|
|
|
return new_node;
|
|
|
|
}
|
2023-02-13 14:28:54 +00:00
|
|
|
|
|
|
|
/// If the node is a function, add it as a function and add its children
|
|
|
|
if (original_dag_node->type == ActionsDAG::ActionType::FUNCTION)
|
|
|
|
{
|
|
|
|
ActionsDAG::NodeRawConstPtrs new_children;
|
|
|
|
for (const auto & child : original_dag_node->children)
|
|
|
|
{
|
|
|
|
const auto & new_child = addClonedDAGToDAG(child, new_dag, node_remap);
|
|
|
|
new_children.push_back(&new_child);
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto & new_node = new_dag->addFunction(original_dag_node->function_base, new_children, original_dag_node->result_name);
|
2023-02-13 22:14:18 +00:00
|
|
|
node_remap[node_name] = {new_dag, &new_node};
|
2023-02-13 14:28:54 +00:00
|
|
|
return new_node;
|
|
|
|
}
|
|
|
|
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected node type in PREWHERE actions: {}", original_dag_node->type);
|
|
|
|
}
|
|
|
|
|
2023-02-13 22:14:18 +00:00
|
|
|
const ActionsDAG::Node & addFunction(
|
|
|
|
ActionsDAGPtr new_dag,
|
|
|
|
const FunctionOverloadResolverPtr & function,
|
|
|
|
ActionsDAG::NodeRawConstPtrs children,
|
|
|
|
OriginalToNewNodeMap & node_remap)
|
|
|
|
{
|
|
|
|
const auto & new_node = new_dag->addFunction(function, children, "");
|
|
|
|
node_remap[new_node.result_name] = {new_dag, &new_node};
|
|
|
|
return new_node;
|
|
|
|
}
|
|
|
|
|
2023-02-13 14:28:54 +00:00
|
|
|
/// Adds a CAST node with the regular name ("CAST(...)") or with the provided name.
|
|
|
|
/// This is different from ActionsDAG::addCast() because it set the name equal to the original name effectively hiding the value before cast,
|
|
|
|
/// but it might be required for further steps with its original uncasted type.
|
2023-02-13 22:14:18 +00:00
|
|
|
const ActionsDAG::Node & addCast(
|
|
|
|
ActionsDAGPtr dag,
|
|
|
|
const ActionsDAG::Node & node_to_cast,
|
|
|
|
const String & type_name,
|
|
|
|
OriginalToNewNodeMap & node_remap)
|
2023-02-13 14:28:54 +00:00
|
|
|
{
|
2023-02-18 17:29:16 +00:00
|
|
|
if (node_to_cast.result_type->getName() == type_name)
|
|
|
|
return node_to_cast;
|
|
|
|
|
2023-02-13 14:28:54 +00:00
|
|
|
Field cast_type_constant_value(type_name);
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column;
|
|
|
|
column.column = DataTypeString().createColumnConst(0, cast_type_constant_value);
|
|
|
|
column.type = std::make_shared<DataTypeString>();
|
|
|
|
|
|
|
|
const auto * cast_type_constant_node = &dag->addColumn(std::move(column));
|
|
|
|
ActionsDAG::NodeRawConstPtrs children = {&node_to_cast, cast_type_constant_node};
|
|
|
|
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<CastType::nonAccurate>::createImpl();
|
|
|
|
|
2023-02-13 22:14:18 +00:00
|
|
|
return addFunction(dag, func_builder_cast, std::move(children), node_remap);
|
2023-02-13 14:28:54 +00:00
|
|
|
}
|
|
|
|
|
2023-02-18 17:45:38 +00:00
|
|
|
/// Normalizes the filter node by adding AND with a constant true.
|
|
|
|
/// This:
|
|
|
|
/// 1. produces a result with the proper Nullable or non-Nullable UInt8 type and
|
|
|
|
/// 2. makes sure that the result contains only 0 or 1 values even if the source column contains non-boolean values.
|
|
|
|
const ActionsDAG::Node & addAndTrue(
|
|
|
|
ActionsDAGPtr dag,
|
|
|
|
const ActionsDAG::Node & filter_node_to_normalize,
|
|
|
|
OriginalToNewNodeMap & node_remap)
|
|
|
|
{
|
|
|
|
Field const_true_value(true);
|
|
|
|
|
|
|
|
ColumnWithTypeAndName const_true_column;
|
|
|
|
const_true_column.column = DataTypeUInt8().createColumnConst(0, const_true_value);
|
|
|
|
const_true_column.type = std::make_shared<DataTypeUInt8>();
|
|
|
|
|
|
|
|
const auto * const_true_node = &dag->addColumn(std::move(const_true_column));
|
|
|
|
ActionsDAG::NodeRawConstPtrs children = {&filter_node_to_normalize, const_true_node};
|
|
|
|
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
|
|
|
|
return addFunction(dag, func_builder_and, children, node_remap);
|
|
|
|
}
|
|
|
|
|
2023-02-13 14:28:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// We want to build a sequence of steps that will compute parts of the prewhere condition.
|
|
|
|
/// Each step reads some new columns and computes some new expressions and a filter condition.
|
|
|
|
/// The last step computes the final filter condition and the remaining expressions that are required for the main query.
|
|
|
|
/// The goal of this is to, when it is possible, filter out many rows in early steps so that the remaining steps will
|
|
|
|
/// read less data from the storage.
|
|
|
|
/// NOTE: The result of executing the steps is exactly the same as if we would execute the original DAG in single step.
|
|
|
|
///
|
|
|
|
/// The steps are built in the following way:
|
|
|
|
/// 1. List all condition nodes that are combined with AND into PREWHERE condition
|
|
|
|
/// 2. Collect the set of columns that are used in each condition
|
|
|
|
/// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns
|
|
|
|
/// 4. Group conditions with the same set of columns into a single read/compute step
|
|
|
|
/// 5. Build DAGs for each step:
|
|
|
|
/// - DFS from the condition root node:
|
|
|
|
/// - If the node was not computed yet, add it to the DAG and traverse its children
|
|
|
|
/// - If the node was already computed by one of the previous steps, add it as output for that step and as input for the current step
|
|
|
|
/// - If the node was already computed by the current step just stop traversing
|
|
|
|
/// 6. Find all outputs of the original DAG
|
|
|
|
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
|
|
|
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
|
|
|
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere)
|
|
|
|
{
|
|
|
|
if (!prewhere_info || !prewhere_info->prewhere_actions)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
/// 1. List all condition nodes that are combined with AND into PREWHERE condition
|
|
|
|
const auto & condition_root = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
|
|
|
|
const bool is_conjunction = (condition_root.type == ActionsDAG::ActionType::FUNCTION && condition_root.function_base->getName() == "and");
|
|
|
|
if (!is_conjunction)
|
|
|
|
return false;
|
|
|
|
auto condition_nodes = condition_root.children;
|
|
|
|
|
|
|
|
/// 2. Collect the set of columns that are used in the condition
|
|
|
|
std::unordered_map<const ActionsDAG::Node *, NodeInfo> nodes_info;
|
|
|
|
for (const auto & node : condition_nodes)
|
|
|
|
{
|
|
|
|
fillRequiredColumns(node, nodes_info);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// 3. Sort condition nodes by the number of columns used in them and the overall size of those columns
|
|
|
|
/// TODO: not sorting for now because the conditions are already sorted by Where Optimizer
|
|
|
|
|
|
|
|
/// 4. Group conditions with the same set of columns into a single read/compute step
|
|
|
|
std::vector<std::vector<const ActionsDAG::Node *>> condition_groups;
|
|
|
|
for (const auto & node : condition_nodes)
|
|
|
|
{
|
|
|
|
const auto & node_info = nodes_info[node];
|
|
|
|
if (!condition_groups.empty() && nodes_info[condition_groups.back().back()].required_columns == node_info.required_columns)
|
|
|
|
condition_groups.back().push_back(node); /// Add to the last group
|
|
|
|
else
|
|
|
|
condition_groups.push_back({node}); /// Start new group
|
|
|
|
}
|
|
|
|
|
|
|
|
/// 5. Build DAGs for each step
|
|
|
|
struct Step
|
|
|
|
{
|
|
|
|
ActionsDAGPtr actions;
|
|
|
|
String column_name;
|
|
|
|
};
|
|
|
|
std::vector<Step> steps;
|
|
|
|
|
|
|
|
OriginalToNewNodeMap node_remap;
|
|
|
|
|
|
|
|
for (const auto & condition_group : condition_groups)
|
|
|
|
{
|
|
|
|
ActionsDAGPtr step_dag = std::make_shared<ActionsDAG>();
|
|
|
|
String result_name;
|
|
|
|
|
|
|
|
std::vector<const ActionsDAG::Node *> new_condition_nodes;
|
|
|
|
for (const auto * node : condition_group)
|
|
|
|
{
|
|
|
|
const auto & node_in_new_dag = addClonedDAGToDAG(node, step_dag, node_remap);
|
|
|
|
new_condition_nodes.push_back(&node_in_new_dag);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (new_condition_nodes.size() > 1)
|
|
|
|
{
|
|
|
|
/// Add AND function to combine the conditions
|
|
|
|
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
|
2023-02-13 22:14:18 +00:00
|
|
|
const auto & and_function_node = addFunction(step_dag, func_builder_and, new_condition_nodes, node_remap);
|
2023-02-13 14:28:54 +00:00
|
|
|
step_dag->addOrReplaceInOutputs(and_function_node);
|
|
|
|
result_name = and_function_node.result_name;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
const auto & result_node = *new_condition_nodes.front();
|
2023-02-18 17:45:38 +00:00
|
|
|
/// Check if explicit cast is needed for the condition to serve as a filter.
|
|
|
|
const auto result_type_name = result_node.result_type->getName();
|
|
|
|
if (result_type_name == "UInt8" ||
|
|
|
|
result_type_name == "Nullable(UInt8)" ||
|
|
|
|
result_type_name == "LowCardinality(UInt8)" ||
|
|
|
|
result_type_name == "LowCardinality(Nullable(UInt8))")
|
2023-02-13 14:28:54 +00:00
|
|
|
{
|
2023-02-18 17:45:38 +00:00
|
|
|
/// No need to cast
|
2023-02-13 14:28:54 +00:00
|
|
|
step_dag->addOrReplaceInOutputs(result_node);
|
|
|
|
result_name = result_node.result_name;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-02-18 17:45:38 +00:00
|
|
|
/// Build "condition AND True" expression to "cast" the condition to UInt8 or Nullable(UInt8) depending on its type.
|
|
|
|
const auto & cast_node = addAndTrue(step_dag, result_node, node_remap);
|
2023-02-13 14:28:54 +00:00
|
|
|
step_dag->addOrReplaceInOutputs(cast_node);
|
|
|
|
result_name = cast_node.result_name;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
steps.push_back({step_dag, result_name});
|
|
|
|
}
|
|
|
|
|
|
|
|
/// 6. Find all outputs of the original DAG
|
|
|
|
auto original_outputs = prewhere_info->prewhere_actions->getOutputs();
|
|
|
|
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
|
|
|
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
|
|
|
NameSet all_output_names;
|
|
|
|
for (const auto * output : original_outputs)
|
|
|
|
{
|
|
|
|
all_output_names.insert(output->result_name);
|
2023-02-13 22:14:18 +00:00
|
|
|
if (node_remap.contains(output->result_name))
|
2023-02-13 14:28:54 +00:00
|
|
|
{
|
2023-02-13 22:14:18 +00:00
|
|
|
const auto & new_node_info = node_remap[output->result_name];
|
2023-02-13 14:28:54 +00:00
|
|
|
new_node_info.dag->addOrReplaceInOutputs(*new_node_info.node);
|
|
|
|
}
|
|
|
|
else if (output->result_name == prewhere_info->prewhere_column_name)
|
|
|
|
{
|
|
|
|
/// Special case for final PREWHERE column: it is an AND combination of all conditions,
|
2023-02-18 17:29:16 +00:00
|
|
|
/// but we have only the condition for the last step here. We know that the combined filter is equivalent to
|
|
|
|
/// to the last condition after filters from previous steps are applied. We just need to CAST the last condition
|
|
|
|
/// to the type of combined filter. We do this in 2 steps:
|
|
|
|
/// 1. AND the last condition with constant True. This is needed to make sure that in the last step filter has UInt8 type
|
2023-02-18 17:45:38 +00:00
|
|
|
/// but contains values other than 0 and 1 (e.g. if it is (number%5) it contains 2,3,4)
|
2023-02-18 17:29:16 +00:00
|
|
|
/// 2. CAST the result to the exact type of the PREWHERE column from the original DAG
|
|
|
|
const auto & last_step_result_node_info = node_remap[steps.back().column_name];
|
|
|
|
auto & last_step_dag = steps.back().actions;
|
|
|
|
/// Build AND(last_step_result_node, true)
|
2023-02-18 17:45:38 +00:00
|
|
|
const auto & and_node = addAndTrue(last_step_dag, *last_step_result_node_info.node, node_remap);
|
2023-02-18 17:29:16 +00:00
|
|
|
/// Build CAST(and_node, type of PREWHERE column)
|
|
|
|
const auto & cast_node = addCast(last_step_dag, and_node, output->result_type->getName(), node_remap);
|
|
|
|
/// Add alias for the result with the name of the PREWHERE column
|
|
|
|
const auto & prewhere_result_node = last_step_dag->addAlias(cast_node, output->result_name);
|
|
|
|
last_step_dag->addOrReplaceInOutputs(prewhere_result_node);
|
2023-02-13 14:28:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
const auto & node_in_new_dag = addClonedDAGToDAG(output, steps.back().actions, node_remap);
|
|
|
|
steps.back().actions->addOrReplaceInOutputs(node_in_new_dag);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// 9. Build PrewhereExprInfo
|
|
|
|
{
|
|
|
|
for (const auto & step : steps)
|
|
|
|
{
|
|
|
|
prewhere.steps.push_back(
|
|
|
|
{
|
|
|
|
.actions = std::make_shared<ExpressionActions>(step.actions, actions_settings),
|
|
|
|
.column_name = step.column_name,
|
|
|
|
.remove_column = !all_output_names.contains(step.column_name), /// Don't remove if it's in the list of original outputs
|
|
|
|
.need_filter = false,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
prewhere.steps.back().need_filter = prewhere_info->need_filter;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|