mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Cleanup.
This commit is contained in:
parent
6f00b49067
commit
a0cc03b175
@ -64,6 +64,7 @@ static ActionsAndName splitSingleAndFilter(ActionsDAG & dag, const ActionsDAG::N
|
|||||||
return ActionsAndName{std::move(split_result.first), std::move(name)};
|
return ActionsAndName{std::move(split_result.first), std::move(name)};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to split the left most AND atom to a separate DAG.
|
||||||
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
|
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
|
||||||
{
|
{
|
||||||
const auto * filter = &dag.findInOutputs(filter_name);
|
const auto * filter = &dag.findInOutputs(filter_name);
|
||||||
@ -83,6 +84,7 @@ static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, c
|
|||||||
|
|
||||||
if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
|
if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
|
||||||
{
|
{
|
||||||
|
/// The order is important. We should take the left-most atom, so put conditions on stack in reverse order.
|
||||||
for (const auto * child : node->children | std::ranges::views::reverse)
|
for (const auto * child : node->children | std::ranges::views::reverse)
|
||||||
nodes.push(child);
|
nodes.push(child);
|
||||||
|
|
||||||
@ -141,6 +143,8 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
|
|||||||
{
|
{
|
||||||
std::vector<ActionsAndName> and_atoms;
|
std::vector<ActionsAndName> and_atoms;
|
||||||
|
|
||||||
|
/// Spliting AND filter condition to steps under the setting, which is enabled with merge_filters optimization.
|
||||||
|
/// This is needed to support short-circuit properly.
|
||||||
if (settings.enable_multiple_filters_transforms_for_and_chain && !actions_dag.hasStatefulFunctions())
|
if (settings.enable_multiple_filters_transforms_for_and_chain && !actions_dag.hasStatefulFunctions())
|
||||||
and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);
|
and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);
|
||||||
|
|
||||||
@ -206,6 +210,19 @@ void FilterStep::describeActions(FormatSettings & settings) const
|
|||||||
|
|
||||||
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
|
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||||
{
|
{
|
||||||
|
auto cloned_dag = actions_dag.clone();
|
||||||
|
|
||||||
|
std::vector<ActionsAndName> and_atoms;
|
||||||
|
if (!actions_dag.hasStatefulFunctions())
|
||||||
|
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
|
||||||
|
|
||||||
|
for (auto & and_atom : and_atoms)
|
||||||
|
{
|
||||||
|
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
|
||||||
|
map.add("AND column", and_atom.name);
|
||||||
|
map.add("Expression", expression->toTree());
|
||||||
|
}
|
||||||
|
|
||||||
map.add("Filter Column", filter_column_name);
|
map.add("Filter Column", filter_column_name);
|
||||||
map.add("Removes Filter", remove_filter_column);
|
map.add("Removes Filter", remove_filter_column);
|
||||||
|
|
||||||
|
@ -175,6 +175,7 @@ namespace Setting
|
|||||||
extern const SettingsBool use_skip_indexes;
|
extern const SettingsBool use_skip_indexes;
|
||||||
extern const SettingsBool use_skip_indexes_if_final;
|
extern const SettingsBool use_skip_indexes_if_final;
|
||||||
extern const SettingsBool use_uncompressed_cache;
|
extern const SettingsBool use_uncompressed_cache;
|
||||||
|
extern const SettingsBool query_plan_merge_filters;
|
||||||
extern const SettingsUInt64 merge_tree_min_read_task_size;
|
extern const SettingsUInt64 merge_tree_min_read_task_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,6 +207,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
|
|||||||
.use_asynchronous_read_from_pool = settings[Setting::allow_asynchronous_read_from_io_pool_for_merge_tree]
|
.use_asynchronous_read_from_pool = settings[Setting::allow_asynchronous_read_from_io_pool_for_merge_tree]
|
||||||
&& (settings[Setting::max_streams_to_max_threads_ratio] > 1 || settings[Setting::max_streams_for_merge_tree_reading] > 1),
|
&& (settings[Setting::max_streams_to_max_threads_ratio] > 1 || settings[Setting::max_streams_for_merge_tree_reading] > 1),
|
||||||
.enable_multiple_prewhere_read_steps = settings[Setting::enable_multiple_prewhere_read_steps],
|
.enable_multiple_prewhere_read_steps = settings[Setting::enable_multiple_prewhere_read_steps],
|
||||||
|
.force_shirt_circuit_execution = settings[Setting::query_plan_merge_filters]
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +330,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
|||||||
auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions(
|
auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions(
|
||||||
prewhere_info,
|
prewhere_info,
|
||||||
actions_settings,
|
actions_settings,
|
||||||
reader_settings.enable_multiple_prewhere_read_steps);
|
reader_settings.enable_multiple_prewhere_read_steps, reader_settings.force_shirt_circuit_execution);
|
||||||
|
|
||||||
for (const auto & step : prewhere_actions.steps)
|
for (const auto & step : prewhere_actions.steps)
|
||||||
add_step(*step);
|
add_step(*step);
|
||||||
|
@ -45,6 +45,8 @@ struct MergeTreeReaderSettings
|
|||||||
bool use_asynchronous_read_from_pool = false;
|
bool use_asynchronous_read_from_pool = false;
|
||||||
/// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps.
|
/// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps.
|
||||||
bool enable_multiple_prewhere_read_steps = false;
|
bool enable_multiple_prewhere_read_steps = false;
|
||||||
|
/// In case of multiple prewhere steps, execute filtering earlier to support short-circuit properly.
|
||||||
|
bool force_shirt_circuit_execution = false;
|
||||||
/// If true, try to lower size of read buffer according to granule size and compressed block size.
|
/// If true, try to lower size of read buffer according to granule size and compressed block size.
|
||||||
bool adjust_read_buffer_size = true;
|
bool adjust_read_buffer_size = true;
|
||||||
/// If true, it's allowed to read the whole part without reading marks.
|
/// If true, it's allowed to read the whole part without reading marks.
|
||||||
|
@ -91,7 +91,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
|||||||
, algorithm(std::move(algorithm_))
|
, algorithm(std::move(algorithm_))
|
||||||
, prewhere_info(prewhere_info_)
|
, prewhere_info(prewhere_info_)
|
||||||
, actions_settings(actions_settings_)
|
, actions_settings(actions_settings_)
|
||||||
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps))
|
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps, reader_settings_.force_shirt_circuit_execution))
|
||||||
, reader_settings(reader_settings_)
|
, reader_settings(reader_settings_)
|
||||||
, result_header(transformHeader(pool->getHeader(), prewhere_info))
|
, result_header(transformHeader(pool->getHeader(), prewhere_info))
|
||||||
{
|
{
|
||||||
@ -124,9 +124,9 @@ String MergeTreeSelectProcessor::getName() const
|
|||||||
return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName());
|
return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere);
|
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere, bool force_shirt_circuit_execution);
|
||||||
|
|
||||||
PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps)
|
PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps, bool force_shirt_circuit_execution)
|
||||||
{
|
{
|
||||||
PrewhereExprInfo prewhere_actions;
|
PrewhereExprInfo prewhere_actions;
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
@ -147,7 +147,7 @@ PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr pr
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!enable_multiple_prewhere_read_steps ||
|
if (!enable_multiple_prewhere_read_steps ||
|
||||||
!tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions))
|
!tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions, force_shirt_circuit_execution))
|
||||||
{
|
{
|
||||||
PrewhereExprStep prewhere_step
|
PrewhereExprStep prewhere_step
|
||||||
{
|
{
|
||||||
|
@ -73,7 +73,8 @@ public:
|
|||||||
static PrewhereExprInfo getPrewhereActions(
|
static PrewhereExprInfo getPrewhereActions(
|
||||||
PrewhereInfoPtr prewhere_info,
|
PrewhereInfoPtr prewhere_info,
|
||||||
const ExpressionActionsSettings & actions_settings,
|
const ExpressionActionsSettings & actions_settings,
|
||||||
bool enable_multiple_prewhere_read_steps);
|
bool enable_multiple_prewhere_read_steps,
|
||||||
|
bool force_shirt_circuit_execution);
|
||||||
|
|
||||||
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
|
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
|
||||||
|
|
||||||
|
@ -50,6 +50,17 @@ void fillRequiredColumns(const ActionsDAG::Node * node, std::unordered_map<const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// void addToOutputsIfNotAlreadyAdded(ActionsDAG & dag, const ActionsDAG::Node * node)
|
||||||
|
// {
|
||||||
|
// auto & outputs = dag.getOutputs();
|
||||||
|
// auto it = outputs.begin();
|
||||||
|
// while (it != outputs.end() && *it != node)
|
||||||
|
// ++it;
|
||||||
|
|
||||||
|
// if (it == outputs.end())
|
||||||
|
// outputs.push_back(node);
|
||||||
|
// }
|
||||||
|
|
||||||
/// Stores information about a node that has already been cloned or added to one of the new DAGs.
|
/// Stores information about a node that has already been cloned or added to one of the new DAGs.
|
||||||
/// This allows to avoid cloning the same sub-DAG into multiple step DAGs but reference previously cloned nodes from earlier steps.
|
/// This allows to avoid cloning the same sub-DAG into multiple step DAGs but reference previously cloned nodes from earlier steps.
|
||||||
struct DAGNodeRef
|
struct DAGNodeRef
|
||||||
@ -58,7 +69,7 @@ struct DAGNodeRef
|
|||||||
const ActionsDAG::Node * node;
|
const ActionsDAG::Node * node;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Result -> DAGNodeRef
|
/// ResultNode -> DAGNodeRef
|
||||||
using OriginalToNewNodeMap = std::unordered_map<const ActionsDAG::Node *, DAGNodeRef>;
|
using OriginalToNewNodeMap = std::unordered_map<const ActionsDAG::Node *, DAGNodeRef>;
|
||||||
using NodeNameToLastUsedStepMap = std::unordered_map<const ActionsDAG::Node *, size_t>;
|
using NodeNameToLastUsedStepMap = std::unordered_map<const ActionsDAG::Node *, size_t>;
|
||||||
|
|
||||||
@ -70,7 +81,6 @@ const ActionsDAG::Node & addClonedDAGToDAG(
|
|||||||
OriginalToNewNodeMap & node_remap,
|
OriginalToNewNodeMap & node_remap,
|
||||||
NodeNameToLastUsedStepMap & node_to_step_map)
|
NodeNameToLastUsedStepMap & node_to_step_map)
|
||||||
{
|
{
|
||||||
//const String & node_name = original_dag_node->result_name;
|
|
||||||
/// Look for the node in the map of already known nodes
|
/// Look for the node in the map of already known nodes
|
||||||
if (node_remap.contains(original_dag_node))
|
if (node_remap.contains(original_dag_node))
|
||||||
{
|
{
|
||||||
@ -82,9 +92,11 @@ const ActionsDAG::Node & addClonedDAGToDAG(
|
|||||||
/// If the node is known from the previous steps, add it as an input, except for constants
|
/// If the node is known from the previous steps, add it as an input, except for constants
|
||||||
if (original_dag_node->type != ActionsDAG::ActionType::COLUMN)
|
if (original_dag_node->type != ActionsDAG::ActionType::COLUMN)
|
||||||
{
|
{
|
||||||
node_ref.dag->addOrReplaceInOutputs(*node_ref.node);
|
// addToOutputsIfNotAlreadyAdded(*node_ref.dag, node_ref.node);
|
||||||
|
node_ref.dag->getOutputs().push_back(node_ref.node);
|
||||||
|
|
||||||
const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type);
|
const auto & new_node = new_dag->addInput(node_ref.node->result_name, node_ref.node->result_type);
|
||||||
node_remap[original_dag_node] = {new_dag.get(), &new_node}; /// TODO: here we update the node reference. Is it always correct?
|
node_remap[original_dag_node] = {new_dag.get(), &new_node};
|
||||||
|
|
||||||
/// Remember the index of the last step which reuses this node.
|
/// Remember the index of the last step which reuses this node.
|
||||||
/// We cannot remove this node from the outputs before that step.
|
/// We cannot remove this node from the outputs before that step.
|
||||||
@ -207,7 +219,11 @@ const ActionsDAG::Node & addAndTrue(
|
|||||||
/// 6. Find all outputs of the original DAG
|
/// 6. Find all outputs of the original DAG
|
||||||
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
|
||||||
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
|
||||||
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere)
|
bool tryBuildPrewhereSteps(
|
||||||
|
PrewhereInfoPtr prewhere_info,
|
||||||
|
const ExpressionActionsSettings & actions_settings,
|
||||||
|
PrewhereExprInfo & prewhere,
|
||||||
|
bool force_shirt_circuit_execution)
|
||||||
{
|
{
|
||||||
if (!prewhere_info)
|
if (!prewhere_info)
|
||||||
return true;
|
return true;
|
||||||
@ -275,26 +291,16 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
|
|||||||
/// Add AND function to combine the conditions
|
/// Add AND function to combine the conditions
|
||||||
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
|
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
|
||||||
const auto & and_function_node = addFunction(step_dag, func_builder_and, new_condition_nodes); //, node_remap);
|
const auto & and_function_node = addFunction(step_dag, func_builder_and, new_condition_nodes); //, node_remap);
|
||||||
//step_dag->addOrReplaceInOutputs(and_function_node);
|
|
||||||
result_node = &and_function_node;
|
result_node = &and_function_node;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
result_node = new_condition_nodes.front();
|
result_node = new_condition_nodes.front();
|
||||||
/// Check if explicit cast is needed for the condition to serve as a filter.
|
/// Check if explicit cast is needed for the condition to serve as a filter.
|
||||||
//const auto result_type_name = result_node->result_type->getName();
|
if (!isUInt8(removeNullable(removeLowCardinality(result_node->result_type))))
|
||||||
if (isUInt8(removeNullable(removeLowCardinality(result_node->result_type))))
|
|
||||||
{
|
|
||||||
/// No need to cast
|
|
||||||
//step_dag->addOrReplaceInOutputs(result_node);
|
|
||||||
//result_name = result_node.result_name;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
/// Build "condition AND True" expression to "cast" the condition to UInt8 or Nullable(UInt8) depending on its type.
|
/// Build "condition AND True" expression to "cast" the condition to UInt8 or Nullable(UInt8) depending on its type.
|
||||||
result_node = &addAndTrue(step_dag, *result_node); //, node_remap);
|
result_node = &addAndTrue(step_dag, *result_node); //, node_remap);
|
||||||
//step_dag->addOrReplaceInOutputs(cast_node);
|
|
||||||
//result_name = &cast_node.result_name;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -334,7 +340,6 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
|
|||||||
const auto & cast_node = addCast(last_step_dag, and_node, output->result_type); //, node_remap);
|
const auto & cast_node = addCast(last_step_dag, and_node, output->result_type); //, node_remap);
|
||||||
/// Add alias for the result with the name of the PREWHERE column
|
/// Add alias for the result with the name of the PREWHERE column
|
||||||
const auto & prewhere_result_node = last_step_dag->addAlias(cast_node, output->result_name);
|
const auto & prewhere_result_node = last_step_dag->addAlias(cast_node, output->result_name);
|
||||||
//last_step_dag->addOrReplaceInOutputs(prewhere_result_node);
|
|
||||||
last_step_dag->getOutputs().push_back(&prewhere_result_node);
|
last_step_dag->getOutputs().push_back(&prewhere_result_node);
|
||||||
steps.back().result_node = &prewhere_result_node;
|
steps.back().result_node = &prewhere_result_node;
|
||||||
}
|
}
|
||||||
@ -358,7 +363,7 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
|
|||||||
/// Don't remove if it's in the list of original outputs
|
/// Don't remove if it's in the list of original outputs
|
||||||
.remove_filter_column =
|
.remove_filter_column =
|
||||||
step.original_node && !all_outputs.contains(step.original_node) && node_to_step[step.original_node] <= step_index,
|
step.original_node && !all_outputs.contains(step.original_node) && node_to_step[step.original_node] <= step_index,
|
||||||
.need_filter = true,
|
.need_filter = force_shirt_circuit_execution,
|
||||||
.perform_alter_conversions = true,
|
.perform_alter_conversions = true,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user