Allow subqueries for IN with parallel replicas under a setting.

This commit is contained in:
Nikolai Kochetov 2024-03-06 16:48:03 +00:00
parent 4a32e9b01e
commit 2730f0b54f
8 changed files with 238 additions and 10 deletions

View File

@ -186,6 +186,7 @@ class IColumn;
\
M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
M(Bool, parallel_replicas_allow_subqueries_for_in, true, "If true, subquery for IN will be executed on every follower replica.", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \

View File

@ -216,7 +216,7 @@ private:
if (enable_parallel_processing_of_joins)
{
/// We don't enable parallel replicas for IN (subquery)
if (ast->as<ASTSubquery>())
if (!settings.parallel_replicas_allow_subqueries_for_in && ast->as<ASTSubquery>())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{

View File

@ -1373,7 +1373,7 @@ void Planner::buildPlanForQueryNode()
const auto & settings = query_context->getSettingsRef();
if (query_context->canUseTaskBasedParallelReplicas())
{
if (planner_context->getPreparedSets().hasSubqueries())
if (!settings.parallel_replicas_allow_subqueries_for_in && planner_context->getPreparedSets().hasSubqueries())
{
if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");

View File

@ -2,6 +2,7 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Planner/PlannerJoinTree.h>
@ -156,7 +157,8 @@ QueryTreeNodePtr replaceTablesWithDummyTables(const QueryTreeNodePtr & query, co
/// Otherwise we can execute current query up to WithMergableStage only.
const QueryNode * findQueryForParallelReplicas(
std::stack<const QueryNode *> stack,
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping)
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
const Settings & settings)
{
const QueryPlan::Node * prev_checked_node = nullptr;
const QueryNode * res = nullptr;
@ -192,7 +194,11 @@ const QueryNode * findQueryForParallelReplicas(
{
const auto * expression = typeid_cast<ExpressionStep *>(step);
const auto * filter = typeid_cast<FilterStep *>(step);
if (!expression && !filter)
const auto * creating_sets = typeid_cast<DelayedCreatingSetsStep *>(step);
bool allowed_creating_sets = settings.parallel_replicas_allow_subqueries_for_in && creating_sets;
if (!expression && !filter && !allowed_creating_sets)
can_distribute_full_node = false;
next_node_to_check = children.front();
@ -274,7 +280,7 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr
/// So that we build a list of candidates again, and call findQueryForParallelReplicas for it.
auto new_stack = getSupportingParallelReplicasQuery(updated_query_tree.get());
const auto & mapping = planner.getQueryNodeToPlanStepMapping();
const auto * res = findQueryForParallelReplicas(new_stack, mapping);
const auto * res = findQueryForParallelReplicas(new_stack, mapping, context->getSettingsRef());
/// Now, return a query from initial stack.
if (res)

View File

@ -2,7 +2,12 @@
2 test2 8
3 test3 8
4 test4 1985
2 test2 8
3 test3 8
4 test4 1985
---
1 test1 42
1 test1 42
---
3 test3
3 test3

View File

@ -8,19 +8,23 @@ INSERT INTO merge_tree_in_subqueries VALUES(5, 'test5', 0);
SET max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=2; -- { serverError SUPPORT_IS_DISABLED }
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=0; -- { serverError SUPPORT_IS_DISABLED }
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT '---';
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT '---';
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=1;
-- IN with tuples is allowed
SELECT '---';
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS allow_experimental_parallel_reading_from_replicas=2;
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=0;
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_subqueries_for_in=1;
DROP TABLE IF EXISTS merge_tree_in_subqueries;

View File

@ -237,7 +237,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;-- { echoOn }
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
@ -250,6 +250,93 @@ Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN are not allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;-- { echoOn }
Expression
Sorting
Expression
Join
Expression
Join
Expression
CreatingSets
Expression
Expression
ReadFromMergeTree
CreatingSet
Expression
Filter
ReadFromSystemNumbers
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
set parallel_replicas_prefer_local_join = 1;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
@ -500,3 +587,90 @@ Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN are not allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;
Expression
Sorting
Expression
Join
Expression
Join
Expression
CreatingSets
Expression
Expression
ReadFromMergeTree
CreatingSet
Expression
Filter
ReadFromSystemNumbers
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas

View File

@ -126,4 +126,42 @@ sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- Subqueries for IN allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- Subqueries for IN are not allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1, parallel_replicas_allow_subqueries_for_in=0;
{%- endfor %}