Backport #71224 to 24.10: Fix WITH TOTALS in subquery with parallel replicas

This commit is contained in:
robot-clickhouse 2024-11-07 13:10:36 +00:00
parent 48c5da6638
commit 766e1379e0
8 changed files with 321 additions and 27 deletions

View File

@ -1555,10 +1555,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
SortingStep::Settings sort_settings(*query_context);
auto sorting_step = std::make_unique<SortingStep>(
plan.getCurrentHeader(),
std::move(sort_description),
0 /*limit*/,
sort_settings);
plan.getCurrentHeader(), std::move(sort_description), 0 /*limit*/, sort_settings, true /*is_sorting_for_merge_join*/);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_table_side));
plan.addStep(std::move(sorting_step));
};

View File

@ -17,6 +17,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMaterializedView.h>
@ -170,12 +171,25 @@ const QueryNode * findQueryForParallelReplicas(
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
const Settings & settings)
{
const QueryPlan::Node * prev_checked_node = nullptr;
struct Frame
{
const QueryPlan::Node * node = nullptr;
/// Below we will check subqueries from `stack` to find outermost subquery that could be executed remotely.
/// Currently traversal algorithm considers only steps with 0 or 1 children and JOIN specifically.
/// When we found some step that requires finalization on the initiator (e.g. GROUP BY) there are two options:
/// 1. If plan looks like a single path (e.g. AggregatingStep -> ExpressionStep -> Reading) we can execute
/// current subquery as a whole with replicas.
/// 2. If we were inside JOIN we cannot offload the whole subquery to replicas because at least one side
/// of the JOIN needs to be finalized on the initiator.
/// So this flag is used to track what subquery to return once we hit a step that needs finalization.
bool inside_join = false;
};
const QueryNode * res = nullptr;
while (!stack.empty())
{
const QueryNode * subquery_node = stack.top();
const QueryNode * const subquery_node = stack.top();
stack.pop();
auto it = mapping.find(subquery_node);
@ -183,22 +197,21 @@ const QueryNode * findQueryForParallelReplicas(
if (it == mapping.end())
break;
const QueryPlan::Node * curr_node = it->second;
const QueryPlan::Node * next_node_to_check = curr_node;
std::stack<Frame> nodes_to_check;
nodes_to_check.push({.node = it->second, .inside_join = false});
bool can_distribute_full_node = true;
bool currently_inside_join = false;
while (next_node_to_check && next_node_to_check != prev_checked_node)
while (!nodes_to_check.empty())
{
const auto & [next_node_to_check, inside_join] = nodes_to_check.top();
nodes_to_check.pop();
const auto & children = next_node_to_check->children;
auto * step = next_node_to_check->step.get();
if (children.empty())
{
/// Found a source step. This should be possible only in the first iteration.
if (prev_checked_node)
return nullptr;
next_node_to_check = nullptr;
/// Found a source step.
}
else if (children.size() == 1)
{
@ -206,12 +219,19 @@ const QueryNode * findQueryForParallelReplicas(
const auto * filter = typeid_cast<FilterStep *>(step);
const auto * creating_sets = typeid_cast<DelayedCreatingSetsStep *>(step);
bool allowed_creating_sets = settings[Setting::parallel_replicas_allow_in_with_subquery] && creating_sets;
const bool allowed_creating_sets = settings[Setting::parallel_replicas_allow_in_with_subquery] && creating_sets;
if (!expression && !filter && !allowed_creating_sets)
const auto * sorting = typeid_cast<SortingStep *>(step);
/// Sorting for merge join is supposed to be done locally before join itself, so it doesn't need finalization.
const bool allowed_sorting = sorting && sorting->isSortingForMergeJoin();
if (!expression && !filter && !allowed_creating_sets && !allowed_sorting)
{
can_distribute_full_node = false;
currently_inside_join = inside_join;
}
next_node_to_check = children.front();
nodes_to_check.push({.node = children.front(), .inside_join = inside_join});
}
else
{
@ -221,12 +241,11 @@ const QueryNode * findQueryForParallelReplicas(
if (!join)
return res;
next_node_to_check = children.front();
for (const auto & child : children)
nodes_to_check.push({.node = child, .inside_join = true});
}
}
/// Current node contains steps like GROUP BY / DISTINCT
/// Will try to execute query up to WithMergableStage
if (!can_distribute_full_node)
{
/// Current query node does not contain subqueries.
@ -234,12 +253,11 @@ const QueryNode * findQueryForParallelReplicas(
if (!res)
return nullptr;
return subquery_node;
return currently_inside_join ? res : subquery_node;
}
/// Query is simple enough to be fully distributed.
res = subquery_node;
prev_checked_node = curr_node;
}
return res;

View File

@ -77,13 +77,11 @@ static ITransformingStep::Traits getTraits(size_t limit)
}
SortingStep::SortingStep(
const Header & input_header,
SortDescription description_,
UInt64 limit_,
const Settings & settings_)
const Header & input_header, SortDescription description_, UInt64 limit_, const Settings & settings_, bool is_sorting_for_merge_join_)
: ITransformingStep(input_header, input_header, getTraits(limit_))
, type(Type::Full)
, result_description(std::move(description_))
, is_sorting_for_merge_join(is_sorting_for_merge_join_)
, limit(limit_)
, sort_settings(settings_)
{

View File

@ -39,7 +39,8 @@ public:
const Header & input_header,
SortDescription description_,
UInt64 limit_,
const Settings & settings_);
const Settings & settings_,
bool is_sorting_for_merge_join_ = false);
/// Full with partitioning
SortingStep(
@ -81,6 +82,8 @@ public:
bool hasPartitions() const { return !partition_by_description.empty(); }
bool isSortingForMergeJoin() const { return is_sorting_for_merge_join; }
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_);
Type getType() const { return type; }
@ -125,6 +128,9 @@ private:
SortDescription partition_by_description;
/// See `findQueryForParallelReplicas`
bool is_sorting_for_merge_join = false;
UInt64 limit;
bool always_read_till_end = false;
bool use_buffering = false;

View File

@ -0,0 +1,10 @@
1 1
1 1
0 0
-----
1 1
1 1
0 0
-----

View File

@ -0,0 +1,48 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="
CREATE TABLE t
(
item_id UInt64,
price_sold Float32,
date Date
)
ENGINE = MergeTree
ORDER BY item_id;
INSERT INTO t VALUES (1, 100, '1970-01-01'), (1, 200, '1970-01-02');
"
for enable_parallel_replicas in {0..1}; do
${CLICKHOUSE_CLIENT} --query="
--- Old analyzer uses different code path and it produces wrong result in this case.
set enable_analyzer=1;
set allow_experimental_parallel_reading_from_replicas=${enable_parallel_replicas}, cluster_for_parallel_replicas='parallel_replicas', max_parallel_replicas=100, parallel_replicas_for_non_replicated_merge_tree=1;
SELECT *
FROM
(
SELECT item_id
FROM t
) AS l
LEFT JOIN
(
SELECT item_id
FROM t
GROUP BY item_id
WITH TOTALS
ORDER BY item_id ASC
) AS r ON l.item_id = r.item_id;
SELECT '-----';
"
done
${CLICKHOUSE_CLIENT} --query="
DROP TABLE t;
"

View File

@ -0,0 +1,116 @@
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` ALL LEFT JOIN (SELECT `__table4`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table4`) AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` ALL LEFT JOIN (SELECT `__table4`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table4`) AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` GLOBAL ALL LEFT JOIN `_data_x_y_` AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
4999950000
4999950000
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
499950000
499960000
499970000
499980000
499990000
500000000
500010000
500020000
500030000
500040000
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` GLOBAL ALL LEFT JOIN `_data_x_y_` AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC

View File

@ -0,0 +1,101 @@
#!/usr/bin/env bash
# Tags: long, no-random-settings, no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="
CREATE TABLE t
(
item_id UInt64,
price_sold Float32,
date Date
)
ENGINE = MergeTree
ORDER BY item_id;
CREATE TABLE t1
(
item_id UInt64,
price_sold Float32,
date Date
)
ENGINE = MergeTree
ORDER BY item_id;
INSERT INTO t SELECT number, number % 10, toDate(number) FROM numbers(100000);
INSERT INTO t1 SELECT number, number % 10, toDate(number) FROM numbers(100000);
"
query1="
SELECT sum(item_id)
FROM
(
SELECT item_id
FROM t
GROUP BY item_id
) AS l
LEFT JOIN
(
SELECT item_id
FROM t1
) AS r ON l.item_id = r.item_id
"
query2="
SELECT sum(item_id)
FROM
(
SELECT item_id
FROM t
) AS l
LEFT JOIN
(
SELECT item_id
FROM t1
GROUP BY item_id
) AS r ON l.item_id = r.item_id
"
query3="
SELECT sum(item_id)
FROM
(
SELECT item_id, price_sold
FROM t
) AS l
LEFT JOIN
(
SELECT item_id
FROM t1
) AS r ON l.item_id = r.item_id
GROUP BY price_sold
ORDER BY price_sold
"
for parallel_replicas_prefer_local_join in 1 0; do
for prefer_local_plan in {0..1}; do
for query in "${query1}" "${query2}" "${query3}"; do
for enable_parallel_replicas in {0..1}; do
${CLICKHOUSE_CLIENT} --query="
set enable_analyzer=1;
set parallel_replicas_prefer_local_join=${parallel_replicas_prefer_local_join};
set parallel_replicas_local_plan=${prefer_local_plan};
set allow_experimental_parallel_reading_from_replicas=${enable_parallel_replicas}, cluster_for_parallel_replicas='parallel_replicas', max_parallel_replicas=100, parallel_replicas_for_non_replicated_merge_tree=1;
--SELECT '----- enable_parallel_replicas=$enable_parallel_replicas prefer_local_plan=$prefer_local_plan parallel_replicas_prefer_local_join=$parallel_replicas_prefer_local_join -----';
${query};
SELECT replaceRegexpAll(replaceRegexpAll(explain, '.*Query: (.*) Replicas:.*', '\\1'), '(.*)_data_[\d]+_[\d]+(.*)', '\1_data_x_y_\2')
FROM
(
EXPLAIN actions=1 ${query}
)
WHERE explain LIKE '%ParallelReplicas%';
"
done
done
done
done