Split prewhere actions into separate conjuctive steps

This commit is contained in:
Alexander Gololobov 2023-02-07 20:45:40 +01:00
parent a7603aad69
commit 45ef2f6d60
6 changed files with 118 additions and 14 deletions

View File

@ -1017,6 +1017,9 @@ std::string ActionsDAG::dumpDAG() const
out << ' ' << map[node]; out << ' ' << map[node];
out << '\n'; out << '\n';
out << "Project input: " << project_input << '\n';
out << "Projected output: " << projected_output << '\n';
return out.str(); return out.str();
} }
@ -1660,20 +1663,20 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & co
return res; return res;
} }
namespace //namespace
{ //{
//
struct ConjunctionNodes //struct ConjunctionNodes
{ //{
ActionsDAG::NodeRawConstPtrs allowed; // ActionsDAG::NodeRawConstPtrs allowed;
ActionsDAG::NodeRawConstPtrs rejected; // ActionsDAG::NodeRawConstPtrs rejected;
}; //};
/// Take a node which result is predicate. /// Take a node which result is predicate.
/// Assuming predicate is a conjunction (probably, trivial). /// Assuming predicate is a conjunction (probably, trivial).
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets. /// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set. /// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes) ConjunctionNodes getConjunctionNodes(const ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
{ {
ConjunctionNodes conjunction; ConjunctionNodes conjunction;
std::unordered_set<const ActionsDAG::Node *> allowed; std::unordered_set<const ActionsDAG::Node *> allowed;
@ -1795,7 +1798,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPt
return arguments; return arguments;
} }
} //}
/// Create actions which calculate conjunction of selected nodes. /// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND). /// Assume conjunction nodes are predicates (and may be used as arguments of function AND).

View File

@ -363,6 +363,7 @@ private:
void compileFunctions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {}); void compileFunctions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {});
#endif #endif
public:
static ActionsDAGPtr cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs); static ActionsDAGPtr cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs);
}; };
@ -372,4 +373,17 @@ struct ActionDAGNodes
ActionsDAG::NodeRawConstPtrs nodes; ActionsDAG::NodeRawConstPtrs nodes;
}; };
struct ConjunctionNodes
{
ActionsDAG::NodeRawConstPtrs allowed;
ActionsDAG::NodeRawConstPtrs rejected;
};
/// Take a node which result is predicate.
/// Assuming predicate is a conjunction (probably, trivial).
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
ConjunctionNodes getConjunctionNodes(const ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes);
} }

View File

@ -7,6 +7,7 @@
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
#include <Common/ElapsedTimeProfileEventIncrement.h> #include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include "Core/Names.h"
#include <DataTypes/DataTypeNothing.h> #include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>
@ -102,6 +103,53 @@ std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(
prewhere_actions->steps.emplace_back(std::move(row_level_filter_step)); prewhere_actions->steps.emplace_back(std::move(row_level_filter_step));
} }
#if 1
auto conjunctions = getConjunctionNodes(
prewhere_info->prewhere_actions->tryFindInOutputs(prewhere_info->prewhere_column_name),
{});
auto original_outputs = prewhere_info->prewhere_actions->getOutputs();
NameSet original_output_names;
for (const auto & output : original_outputs)
original_output_names.insert(output->result_name);
auto inputs = prewhere_info->prewhere_actions->getInputs();
ColumnsWithTypeAndName all_inputs;
for (const auto & input : inputs)
all_inputs.emplace_back(input->column, input->result_type, input->result_name);
ActionsDAG::NodeRawConstPtrs all_conjunctions = std::move(conjunctions.allowed);
all_conjunctions.insert(all_conjunctions.end(), conjunctions.rejected.begin(), conjunctions.rejected.end());
for (const auto & conjunction : all_conjunctions)
{
auto step_dag = ActionsDAG::cloneActionsForConjunction({conjunction}, all_inputs);
/// Return the condition columns
Names step_outputs{conjunction->result_name};
/// Preserve all the original outputs computed at this step
for (const auto & output : original_output_names)
if (step_dag->tryRestoreColumn(output))
step_outputs.emplace_back(output);
step_dag->removeUnusedActions(step_outputs, true, true);
//std::cerr << conjunction->result_name << "\n";
std::cerr << step_dag->dumpDAG() << "\n";
PrewhereExprStep prewhere_step
{
.actions = std::make_shared<ExpressionActions>(step_dag, actions_settings),
.column_name = conjunction->result_name,
.remove_column = false, // TODO: properly set this depending on whether the column is used in the next step
.need_filter = false
};
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
}
//prewhere_actions->steps.back().remove_column = prewhere_info->remove_prewhere_column;
prewhere_actions->steps.back().need_filter = prewhere_info->need_filter;
#else
PrewhereExprStep prewhere_step PrewhereExprStep prewhere_step
{ {
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings), .actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings),
@ -111,6 +159,7 @@ std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(
}; };
prewhere_actions->steps.emplace_back(std::move(prewhere_step)); prewhere_actions->steps.emplace_back(std::move(prewhere_step));
#endif
} }
return prewhere_actions; return prewhere_actions;

View File

@ -101,8 +101,10 @@ protected:
static void static void
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns); injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
public:
static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings); static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings);
protected:
static void initializeRangeReadersImpl( static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader, MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers, std::deque<MergeTreeRangeReader> & pre_range_readers,

View File

@ -5,6 +5,7 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include "Storages/MergeTree/MergeTreeBaseSelectProcessor.h"
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
@ -291,7 +292,6 @@ MergeTreeReadTaskColumns getReadTaskColumns(
bool with_subcolumns) bool with_subcolumns)
{ {
Names column_names = required_columns; Names column_names = required_columns;
Names pre_column_names;
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part /// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns) for (const auto & name : system_columns)
@ -313,6 +313,40 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (prewhere_info) if (prewhere_info)
{ {
auto prewhere_actions = IMergeTreeSelectAlgorithm::getPrewhereActions(prewhere_info, {});
NameSet pre_name_set;
for (const auto & step : prewhere_actions->steps)
{
Names step_column_names = step.actions->getActionsDAG().getRequiredColumnsNames();
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, step_column_names);
Names new_step_column_names;
for (const auto & name : step_column_names)
{
if (pre_name_set.contains(name))
continue;
new_step_column_names.push_back(name);
pre_name_set.insert(name);
}
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, new_step_column_names));
}
/// Remove prewhere columns from the list of columns to read
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.contains(name))
post_column_names.push_back(name);
column_names = post_column_names;
#if 0
NameSet pre_name_set; NameSet pre_name_set;
/// Add column reading steps: /// Add column reading steps:
@ -346,9 +380,11 @@ MergeTreeReadTaskColumns getReadTaskColumns(
post_column_names.push_back(name); post_column_names.push_back(name);
column_names = post_column_names; column_names = post_column_names;
#endif
} }
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names)); // result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names));
/// 3. Rest of the requested columns /// 3. Rest of the requested columns
result.columns = storage_snapshot->getColumnsByNames(options, column_names); result.columns = storage_snapshot->getColumnsByNames(options, column_names);

View File

@ -274,7 +274,7 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
if (!it->viable) if (!it->viable)
break; break;
#if 0
bool moved_enough = false; bool moved_enough = false;
if (total_size_of_queried_columns > 0) if (total_size_of_queried_columns > 0)
{ {
@ -292,7 +292,7 @@ void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
if (moved_enough) if (moved_enough)
break; break;
#endif
move_condition(it); move_condition(it);
} }