Refactor optimizeReadInOrder a bit.

This commit is contained in:
Nikolai Kochetov 2024-09-17 12:51:43 +00:00
parent a3cd1ba197
commit 80056bef7b

View File

@ -34,7 +34,10 @@
namespace DB::QueryPlanOptimizations
{
static ISourceStep * checkSupportedReadingStep(IQueryPlanStep * step, bool allow_existing_order)
namespace
{
ISourceStep * checkSupportedReadingStep(IQueryPlanStep * step, bool allow_existing_order)
{
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
@ -75,7 +78,7 @@ static ISourceStep * checkSupportedReadingStep(IQueryPlanStep * step, bool allow
using StepStack = std::vector<IQueryPlanStep*>;
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, bool allow_existing_order)
QueryPlan::Node * findReadingStep(QueryPlan::Node & node, bool allow_existing_order)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = checkSupportedReadingStep(step, allow_existing_order))
@ -100,7 +103,7 @@ using FixedColumns = std::unordered_set<const ActionsDAG::Node *>;
/// Right now we find only simple cases like 'and(..., and(..., and(column = value, ...), ...'
/// Injective functions are supported here. For a condition 'injectiveFunction(x) = 5' column 'x' is fixed.
static void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expression, FixedColumns & fixed_columns)
void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expression, FixedColumns & fixed_columns)
{
std::stack<const ActionsDAG::Node *> stack;
stack.push(&filter_expression);
@ -149,7 +152,7 @@ static void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filt
}
}
static void appendExpression(std::optional<ActionsDAG> & dag, const ActionsDAG & expression)
void appendExpression(std::optional<ActionsDAG> & dag, const ActionsDAG & expression)
{
if (dag)
dag->mergeInplace(expression.clone());
@ -309,7 +312,8 @@ void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
InputOrderInfoPtr buildInputOrderInfo(
/// For the case when the order of keys is important (ORDER BY keys).
InputOrderInfoPtr buildInputOrderFromSortDescription(
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const SortDescription & description,
@ -466,7 +470,7 @@ InputOrderInfoPtr buildInputOrderInfo(
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
}
/// We really need three different sort descriptions here.
/// We may need a few different sort descriptions here.
/// For example:
///
/// create table tab (a Int32, b Int32, c Int32, d Int32) engine = MergeTree order by (a, b, c);
@ -477,19 +481,21 @@ InputOrderInfoPtr buildInputOrderInfo(
/// (a, c) - a sort description for merging (an input of AggregatingInOrderTransfrom is sorted by this GROUP BY keys)
/// (a, c, d) - a group by sort description (an input of FinishAggregatingInOrderTransform is sorted by all GROUP BY keys)
///
/// The group by sort description is filled in optimizeAggregationInOrder now.
///
/// Sort description from input_order is not actually used. ReadFromMergeTree reads only PK prefix size.
/// We should remove it later.
struct AggregationInputOrder
struct InputOrder
{
InputOrderInfoPtr input_order;
SortDescription sort_description_for_merging;
SortDescription group_by_sort_description;
SortDescription sort_description;
};
AggregationInputOrder buildInputOrderInfo(
/// For the case when the order of keys is not important (GROUP BY / DISTINCT)
InputOrder buildInputOrderFromUnorderedKeys(
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const Names & group_by_keys,
const Names & unordered_keys,
const ActionsDAG & sorting_key_dag,
const Names & sorting_key_columns)
{
@ -549,15 +555,15 @@ AggregationInputOrder buildInputOrderInfo(
/// So far, 0 means any direction is possible. It is ok for constant prefix.
int read_direction = 0;
size_t next_sort_key = 0;
std::unordered_set<std::string_view> not_matched_group_by_keys(group_by_keys.begin(), group_by_keys.end());
std::unordered_set<std::string_view> not_matched_keys(unordered_keys.begin(), unordered_keys.end());
SortDescription group_by_sort_description;
group_by_sort_description.reserve(group_by_keys.size());
SortDescription sort_description;
sort_description.reserve(unordered_keys.size());
SortDescription order_key_prefix_descr;
order_key_prefix_descr.reserve(sorting_key_columns.size());
while (!not_matched_group_by_keys.empty() && next_sort_key < sorting_key_columns.size())
while (!not_matched_keys.empty() && next_sort_key < sorting_key_columns.size())
{
const auto & sorting_key_column = sorting_key_columns[next_sort_key];
@ -579,8 +585,8 @@ AggregationInputOrder buildInputOrderInfo(
if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
break;
group_by_key_it = not_matched_group_by_keys.find(sorting_key_column);
if (group_by_key_it == not_matched_group_by_keys.end())
group_by_key_it = not_matched_keys.find(sorting_key_column);
if (group_by_key_it == not_matched_keys.end())
break;
current_direction = 1;
@ -601,9 +607,9 @@ AggregationInputOrder buildInputOrderInfo(
//std::cerr << "====== Finding match for " << sort_column_node->result_name << ' ' << static_cast<const void *>(sort_column_node) << std::endl;
if (match && match->node)
group_by_key_it = not_matched_group_by_keys.find(group_by_key_node->result_name);
group_by_key_it = not_matched_keys.find(group_by_key_node->result_name);
if (match && match->node && group_by_key_it != not_matched_group_by_keys.end())
if (match && match->node && group_by_key_it != not_matched_keys.end())
{
//std::cerr << "====== Found direct match" << std::endl;
@ -644,9 +650,9 @@ AggregationInputOrder buildInputOrderInfo(
/// Prefix sort description for reading will be (negate(y) DESC, negate(x) DESC),
/// Sort description for GROUP BY will be (negate(y) DESC, negate(x) DESC, z).
//std::cerr << "---- adding " << std::string(*group_by_key_it) << std::endl;
group_by_sort_description.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
sort_description.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
order_key_prefix_descr.emplace_back(SortColumnDescription(std::string(*group_by_key_it), current_direction));
not_matched_group_by_keys.erase(group_by_key_it);
not_matched_keys.erase(group_by_key_it);
}
else
{
@ -659,19 +665,19 @@ AggregationInputOrder buildInputOrderInfo(
break;
}
if (read_direction == 0 || group_by_sort_description.empty())
if (read_direction == 0 || sort_description.empty())
return {};
SortDescription sort_description_for_merging = group_by_sort_description;
// SortDescription sort_description_for_merging = group_by_sort_description;
for (const auto & key : not_matched_group_by_keys)
group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
// for (const auto & key : not_matched_group_by_keys)
// group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0);
return { std::move(input_order), std::move(sort_description_for_merging), std::move(group_by_sort_description) };
return { std::move(input_order), std::move(sort_description) }; // std::move(group_by_sort_description) };
}
InputOrderInfoPtr buildInputOrderInfo(
InputOrderInfoPtr buildInputOrderFromSortDescription(
const ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -680,14 +686,14 @@ InputOrderInfoPtr buildInputOrderInfo(
{
const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
return buildInputOrderInfo(
return buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
limit);
}
InputOrderInfoPtr buildInputOrderInfo(
InputOrderInfoPtr buildInputOrderFromSortDescription(
ReadFromMerge * merge,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -705,7 +711,7 @@ InputOrderInfoPtr buildInputOrderInfo(
if (sorting_key.column_names.empty())
return nullptr;
auto table_order_info = buildInputOrderInfo(
auto table_order_info = buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
@ -723,30 +729,30 @@ InputOrderInfoPtr buildInputOrderInfo(
return order_info;
}
AggregationInputOrder buildInputOrderInfo(
InputOrder buildInputOrderFromUnorderedKeys(
ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const Names & group_by_keys)
const Names & unordered_keys)
{
const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
const auto & sorting_key_columns = sorting_key.column_names;
return buildInputOrderInfo(
return buildInputOrderFromUnorderedKeys(
fixed_columns,
dag, group_by_keys,
dag, unordered_keys,
sorting_key.expression->getActionsDAG(), sorting_key_columns);
}
AggregationInputOrder buildInputOrderInfo(
InputOrder buildInputOrderFromUnorderedKeys(
ReadFromMerge * merge,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const Names & group_by_keys)
const Names & unordered_keys)
{
const auto & tables = merge->getSelectedTables();
AggregationInputOrder order_info;
InputOrder order_info;
for (const auto & table : tables)
{
auto storage = std::get<StoragePtr>(table);
@ -756,9 +762,9 @@ AggregationInputOrder buildInputOrderInfo(
if (sorting_key_columns.empty())
return {};
auto table_order_info = buildInputOrderInfo(
auto table_order_info = buildInputOrderFromUnorderedKeys(
fixed_columns,
dag, group_by_keys,
dag, unordered_keys,
sorting_key.expression->getActionsDAG(), sorting_key_columns);
if (!table_order_info.input_order)
@ -791,7 +797,7 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromSortDescription(
reading,
fixed_columns,
dag, description,
@ -808,7 +814,7 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
}
else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromSortDescription(
merge,
fixed_columns,
dag, description,
@ -827,7 +833,7 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
return nullptr;
}
AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node & node)
InputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node, false);
if (!reading_node)
@ -845,7 +851,7 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromUnorderedKeys(
reading,
fixed_columns,
dag, keys);
@ -864,7 +870,7 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
}
else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromUnorderedKeys(
merge,
fixed_columns,
dag, keys);
@ -882,7 +888,7 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
return {};
}
bool canImproveOrderForDistinct(AggregationInputOrder & required_order, const InputOrderInfoPtr & existing_order)
bool canImproveOrderForDistinct(InputOrder & required_order, const InputOrderInfoPtr & existing_order)
{
if (!required_order.input_order)
return false;
@ -898,7 +904,7 @@ bool canImproveOrderForDistinct(AggregationInputOrder & required_order, const In
if (existing_order && required_order.input_order->direction != existing_order->direction)
{
/// Take read direction from existing order.
for (auto & column : required_order.sort_description_for_merging)
for (auto & column : required_order.sort_description)
column.direction *= -1;
required_order.input_order = std::make_shared<InputOrderInfo>(
@ -911,7 +917,7 @@ bool canImproveOrderForDistinct(AggregationInputOrder & required_order, const In
return true;
}
AggregationInputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::Node & node)
InputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::Node & node)
{
/// Here we allow improving existing in-order optimization.
/// Example: SELECT DISTINCT a, b FROM t ORDER BY a; -- sorting key: a, b
@ -933,7 +939,7 @@ AggregationInputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::No
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromUnorderedKeys(
reading,
fixed_columns,
dag, keys);
@ -951,7 +957,7 @@ AggregationInputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::No
}
else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
auto order_info = buildInputOrderInfo(
auto order_info = buildInputOrderFromUnorderedKeys(
merge,
fixed_columns,
dag, keys);
@ -968,7 +974,7 @@ AggregationInputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::No
return {};
}
static bool readingFromParallelReplicas(const QueryPlan::Node * node)
bool readingFromParallelReplicas(const QueryPlan::Node * node)
{
IQueryPlanStep * step = node->step.get();
while (!node->children.empty())
@ -980,6 +986,8 @@ static bool readingFromParallelReplicas(const QueryPlan::Node * node)
return typeid_cast<const ReadFromParallelRemoteReplicasStep *>(step);
}
}
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)
@ -1098,7 +1106,19 @@ void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &)
/// TODO: maybe add support for UNION later.
auto order_info = buildInputOrderInfo(*aggregating, *node.children.front());
if (order_info.input_order)
aggregating->applyOrder(std::move(order_info.sort_description_for_merging), std::move(order_info.group_by_sort_description));
{
std::unordered_set<std::string_view> used_keys;
for (const auto & desc : order_info.sort_description)
used_keys.insert(desc.column_name);
/// Append other GROUP BY keys to sort description.
SortDescription group_by_sort_description = order_info.sort_description;
for (const auto & key : aggregating->getParams().keys)
if (used_keys.emplace(key).second)
group_by_sort_description.push_back(SortColumnDescription(std::string(key)));
aggregating->applyOrder(std::move(order_info.sort_description), std::move(group_by_sort_description));
}
}
void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &)
@ -1118,7 +1138,7 @@ void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &)
auto order_info = buildInputOrderInfo(*distinct, *node.children.front());
if (order_info.input_order)
distinct->applyOrder(std::move(order_info.sort_description_for_merging));
distinct->applyOrder(std::move(order_info.sort_description));
}
/// This optimization is obsolete and will be removed.