Merge pull request #73246 from ClickHouse/backport/24.10/72510

Backport #72510 to 24.10: Fix: threesome joins with parallel replicas
This commit is contained in:
robot-ch-test-poll 2024-12-13 00:06:32 +01:00 committed by GitHub
commit e0b3d4c122
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 31336 additions and 7 deletions

View File

@ -955,7 +955,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// query_plan can be empty if there is nothing to read
if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings))
{
const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node)
auto allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node)
{
const JoinNode * join_node = join_tree_node->as<JoinNode>();
if (!join_node)
@ -968,7 +968,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
return true;
return false;
}(parent_join_tree);
};
if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0)
{
@ -992,7 +992,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
query_plan = std::move(query_plan_parallel_replicas);
}
}
else if (ClusterProxy::canUseParallelReplicasOnInitiator(query_context) && allow_parallel_replicas_for_table_expression)
else if (
ClusterProxy::canUseParallelReplicasOnInitiator(query_context)
&& allow_parallel_replicas_for_table_expression(parent_join_tree))
{
// (1) find read step
QueryPlan::Node * node = query_plan.getRootNode();
@ -1840,18 +1842,45 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
}
}
size_t joins_count = 0;
bool is_full_join = false;
/// For each table, table function, query, union table expressions prepare before query plan build
for (size_t i = 0; i < table_expressions_stack_size; ++i)
{
const auto & table_expression = table_expressions_stack[i];
auto table_expression_type = table_expression->getNodeType();
if (table_expression_type == QueryTreeNodeType::JOIN ||
table_expression_type == QueryTreeNodeType::ARRAY_JOIN)
if (table_expression_type == QueryTreeNodeType::ARRAY_JOIN)
continue;
if (table_expression_type == QueryTreeNodeType::JOIN)
{
++joins_count;
const auto & join_node = table_expression->as<const JoinNode &>();
if (join_node.getKind() == JoinKind::Full)
is_full_join = true;
continue;
}
prepareBuildQueryPlanForTableExpression(table_expression, planner_context);
}
/// disable parallel replicas for n-way join with FULL JOIN involved
if (joins_count > 1 && is_full_join)
planner_context->getMutableQueryContext()->setSetting("enable_parallel_replicas", Field{0});
// in case of n-way JOINs the table expression stack contains several join nodes
// so, we need to find right parent node for a table expression to pass into buildQueryPlanForTableExpression()
QueryTreeNodePtr parent_join_tree = join_tree_node;
for (const auto & node : table_expressions_stack)
{
if (node->getNodeType() == QueryTreeNodeType::JOIN || node->getNodeType() == QueryTreeNodeType::ARRAY_JOIN)
{
parent_join_tree = node;
break;
}
}
/** If left most table expression query plan is planned to stage that is not equal to fetch columns,
* then left most table expression is responsible for providing valid JOIN TREE part of final query plan.
*
@ -1860,7 +1889,7 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
auto left_table_expression = table_expressions_stack.front();
auto left_table_expression_query_plan = buildQueryPlanForTableExpression(
left_table_expression,
join_tree_node,
parent_join_tree,
select_query_info,
select_query_options,
planner_context,
@ -1929,6 +1958,18 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
continue;
}
// find parent join node
parent_join_tree.reset();
for (size_t j = i + 1; j < table_expressions_stack.size(); ++j)
{
const auto & node = table_expressions_stack[j];
if (node->getNodeType() == QueryTreeNodeType::JOIN || node->getNodeType() == QueryTreeNodeType::ARRAY_JOIN)
{
parent_join_tree = node;
break;
}
}
/** If table expression is remote and it is not left most table expression, we wrap read columns from such
* table expression in subquery.
*/

View File

@ -5594,7 +5594,8 @@ void StorageReplicatedMergeTree::read(
cluster->getName());
}
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); }
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams);
}
void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,

View File

@ -0,0 +1,186 @@
-- { echoOn }
SELECT 'First JOIN FULL second JOIN INNER';
First JOIN FULL second JOIN INNER
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=0, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN LEFT';
First JOIN FULL second JOIN LEFT
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=0, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN RIGHT';
First JOIN FULL second JOIN RIGHT
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=0, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN FULL';
First JOIN FULL second JOIN FULL
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=0, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN INNER';
First JOIN FULL second JOIN INNER
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=1, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN LEFT';
First JOIN FULL second JOIN LEFT
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=1, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN RIGHT';
First JOIN FULL second JOIN RIGHT
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=1, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0
SELECT 'First JOIN FULL second JOIN FULL';
First JOIN FULL second JOIN FULL
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan=1, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
0

View File

@ -0,0 +1,73 @@
SET enable_analyzer = 1;
SET enable_parallel_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SET parallel_replicas_mark_segment_size = 1;
DROP TABLE IF EXISTS test_table_join_1 SYNC;
CREATE TABLE test_table_join_1
(
id UInt8,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_1', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_2 SYNC;
CREATE TABLE test_table_join_2
(
id UInt16,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_2', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_3 SYNC;
CREATE TABLE test_table_join_3
(
id UInt64,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_3', 'r1') ORDER BY id SETTINGS index_granularity=1;
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500);
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500, 400);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(100, 500);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(500, 500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(600, 400);
-- { echoOn }
{% for parallel_replicas_local_plan in ['0', '1'] -%}
{% for first_join_type in ['FULL'] -%}
{% for second_join_type in ['INNER', 'LEFT', 'RIGHT', 'FULL'] -%}
SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT count()
FROM
(
SELECT *
FROM viewExplain('EXPLAIN PIPELINE', '', (
SELECT
t1.id,
t1.value,
t2.id,
t2.value,
t3.id,
t3.value
FROM test_table_join_1 AS t1
FULL OUTER JOIN test_table_join_2 AS t2 USING(id)
INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan={{ parallel_replicas_local_plan }}, max_threads=1
))
)
WHERE explain ILIKE '%ReadPoolParallelReplicas%';
{% endfor %}
{% endfor %}
{% endfor %}
-- { echoOff }
DROP TABLE test_table_join_1 SYNC;
DROP TABLE test_table_join_2 SYNC;
DROP TABLE test_table_join_3 SYNC;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,65 @@
SET enable_analyzer = 1;
SET enable_parallel_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SET parallel_replicas_mark_segment_size = 1;
DROP TABLE IF EXISTS test_table_join_1 SYNC;
CREATE TABLE test_table_join_1
(
id UInt8,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_1', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_2 SYNC;
CREATE TABLE test_table_join_2
(
id UInt16,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_2', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_3 SYNC;
CREATE TABLE test_table_join_3
(
id UInt64,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_3', 'r1') ORDER BY id SETTINGS index_granularity=1;
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500);
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500, 400);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(100, 500);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(500, 500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(600, 400);
-- { echoOn }
{% for parallel_replicas_local_plan in ['0', '1'] -%}
{% for first_join_type in ['INNER'] -%}
{% for second_join_type in ['INNER', 'LEFT', 'RIGHT', 'FULL'] -%}
SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT
id AS using_id,
t1.id AS t1_id,
t1.value AS t1_value,
t2.id AS t2_id,
t2.value AS t2_value,
t3.id AS t3_id,
t3.value AS t3_value
FROM test_table_join_1 AS t1
{{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id)
{{ second_join_type }} JOIN test_table_join_3 AS t3 USING (id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan={{ parallel_replicas_local_plan }};
{% endfor %}
{% endfor %}
{% endfor %}
-- { echoOff }
DROP TABLE test_table_join_1 SYNC;
DROP TABLE test_table_join_2 SYNC;
DROP TABLE test_table_join_3 SYNC;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,65 @@
SET enable_analyzer = 1;
SET enable_parallel_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SET parallel_replicas_mark_segment_size = 1;
DROP TABLE IF EXISTS test_table_join_1 SYNC;
CREATE TABLE test_table_join_1
(
id UInt8,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_1', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_2 SYNC;
CREATE TABLE test_table_join_2
(
id UInt16,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_2', 'r1') ORDER BY id SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_3 SYNC;
CREATE TABLE test_table_join_3
(
id UInt64,
value String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_3', 'r1') ORDER BY id SETTINGS index_granularity=1;
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500);
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500, 400);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(100, 500);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(500, 500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(600, 400);
-- { echoOn }
{% for parallel_replicas_local_plan in ['0', '1'] -%}
{% for first_join_type in ['LEFT'] -%}
{% for second_join_type in ['INNER', 'LEFT', 'RIGHT', 'FULL'] -%}
SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT
id AS using_id,
t1.id AS t1_id,
t1.value AS t1_value,
t2.id AS t2_id,
t2.value AS t2_value,
t3.id AS t3_id,
t3.value AS t3_value
FROM test_table_join_1 AS t1
{{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id)
{{ second_join_type }} JOIN test_table_join_3 AS t3 USING (id)
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan={{ parallel_replicas_local_plan }};
{% endfor %}
{% endfor %}
{% endfor %}
-- { echoOff }
DROP TABLE test_table_join_1 SYNC;
DROP TABLE test_table_join_2 SYNC;
DROP TABLE test_table_join_3 SYNC;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,64 @@
SET enable_analyzer = 1;
SET enable_parallel_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SET parallel_replicas_mark_segment_size = 1;
DROP TABLE IF EXISTS test_table_join_1 SYNC;
CREATE TABLE test_table_join_1
(
id_1 UInt8,
value_1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_1', 'r1') ORDER BY id_1 SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_2 SYNC;
CREATE TABLE test_table_join_2
(
id_2 UInt16,
value_2 String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_2', 'r1') ORDER BY id_2 SETTINGS index_granularity=1;
DROP TABLE IF EXISTS test_table_join_3 SYNC;
CREATE TABLE test_table_join_3
(
id_3 UInt64,
value_3 String
) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_table_join_3', 'r1') ORDER BY id_3 SETTINGS index_granularity=1;
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500);
INSERT INTO test_table_join_1 SELECT number, concat('t1_value_', number) from numbers(500, 400);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(100, 500);
INSERT INTO test_table_join_2 SELECT number, concat('t2_value_', number) from numbers(500, 500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(500);
INSERT INTO test_table_join_3 SELECT number, concat('t3_value_', number) from numbers(600, 400);
-- { echoOn }
{% for parallel_replicas_local_plan in ['0', '1'] -%}
{% for first_join_type in ['RIGHT'] -%}
{% for second_join_type in ['INNER', 'LEFT', 'RIGHT', 'FULL'] -%}
SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT
t1.id_1,
t1.value_1,
t2.id_2,
t2.value_2,
t3.id_3,
t3.value_3
FROM test_table_join_1 AS t1
{{ first_join_type }} JOIN test_table_join_2 AS t2 ON t1.id_1 == t2.id_2
{{ second_join_type }} JOIN test_table_join_3 AS t3 ON t1.id_1 == t3.id_3
ORDER BY ALL ASC
SETTINGS parallel_replicas_local_plan={{ parallel_replicas_local_plan }};
{% endfor %}
{% endfor %}
{% endfor %}
-- { echoOff }
DROP TABLE test_table_join_1 SYNC;
DROP TABLE test_table_join_2 SYNC;
DROP TABLE test_table_join_3 SYNC;