Parallel replicas feature is Beta

This commit is contained in:
Alexey Milovidov 2024-04-30 06:26:51 +02:00 committed by Nikita Mikhaylov
parent 61524aabb6
commit c459e5c3f6
63 changed files with 213 additions and 199 deletions

View File

@ -1623,7 +1623,7 @@ When used in conjuction with [parallel_replicas_custom_key_range_lower](#paralle
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` for parallel processing.
## allow_experimental_parallel_reading_from_replicas
## use_parallel_replicas
Enables or disables sending SELECT queries to all replicas of a table (up to `max_parallel_replicas`). Reading is parallelized and coordinated dynamically. It will work for any kind of MergeTree table.

View File

@ -164,7 +164,7 @@ ContextMutablePtr updateSettingsAndClientInfoForCluster(const Cluster & cluster,
}
}
if (disable_parallel_replicas)
new_settings.allow_experimental_parallel_reading_from_replicas = 0;
new_settings.use_parallel_replicas = 0;
}
if (settings.max_execution_time_leaf.value > 0)
@ -288,9 +288,9 @@ void executeQuery(
auto cluster = query_info.getCluster();
auto new_context = updateSettingsAndClientInfoForCluster(*cluster, is_remote_function, context,
settings, main_table, query_info.additional_filter_ast, log, &distributed_settings);
if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
&& context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value
!= new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value)
if (context->getSettingsRef().use_parallel_replicas
&& context->getSettingsRef().use_parallel_replicas.value
!= new_context->getSettingsRef().use_parallel_replicas.value)
{
LOG_TRACE(
log,
@ -462,14 +462,14 @@ void executeQueryWithParallelReplicas(
{
LOG_WARNING(
getLogger("executeQueryWithParallelReplicas"),
"Setting 'use_hedged_requests' explicitly with enabled 'allow_experimental_parallel_reading_from_replicas' has no effect. "
"Setting 'use_hedged_requests' explicitly with enabled 'use_parallel_replicas' has no effect. "
"Hedged connections are not used for parallel reading from replicas");
}
else
{
LOG_INFO(
getLogger("executeQueryWithParallelReplicas"),
"Disabling 'use_hedged_requests' in favor of 'allow_experimental_parallel_reading_from_replicas'. Hedged connections are "
"Disabling 'use_hedged_requests' in favor of 'use_parallel_replicas'. Hedged connections are "
"not used for parallel reading from replicas");
}

View File

@ -5633,7 +5633,7 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
if (!settings_ref.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings_ref.allow_experimental_parallel_reading_from_replicas > 0)
if (settings_ref.use_parallel_replicas > 0)
return READ_TASKS;
return SAMPLE_KEY;

View File

@ -219,13 +219,13 @@ private:
/// We don't enable parallel replicas for IN (subquery)
if (!settings.parallel_replicas_allow_in_with_subquery && ast->as<ASTSubquery>())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
if (settings.use_parallel_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "IN with subquery is not supported with parallel replicas");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
data.getContext()->getQueryContext()->setSetting("use_parallel_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
else if (settings.use_parallel_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
}
}
@ -277,13 +277,13 @@ private:
if (!is_subquery)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
if (settings.use_parallel_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "JOIN with parallel replicas is only supported with subqueries");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
data.getContext()->getQueryContext()->setSetting("use_parallel_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
else if (settings.use_parallel_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOIN with parallel replicas is only supported with subqueries");
}
}

View File

@ -504,12 +504,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
bool is_query_with_final = isQueryWithFinal(query_info);
if (is_query_with_final && context->canUseTaskBasedParallelReplicas())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
if (settings.use_parallel_replicas == 1)
{
LOG_DEBUG(log, "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("use_parallel_replicas", Field(0));
}
else if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
else if (settings.use_parallel_replicas >= 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
}
@ -517,14 +517,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Check support for parallel replicas for non-replicated storage (plain MergeTree)
bool is_plain_merge_tree = storage && storage->isMergeTree() && !storage->supportsReplication();
if (is_plain_merge_tree && settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.parallel_replicas_for_non_replicated_merge_tree)
if (is_plain_merge_tree && settings.use_parallel_replicas > 0 && !settings.parallel_replicas_for_non_replicated_merge_tree)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
if (settings.use_parallel_replicas == 1)
{
LOG_DEBUG(log, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`. For now query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("use_parallel_replicas", Field(0));
}
else if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
else if (settings.use_parallel_replicas >= 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`");
}
@ -814,10 +814,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
};
/// This is a hack to make sure we reanalyze if GlobalSubqueriesVisitor changed allow_experimental_parallel_reading_from_replicas
/// This is a hack to make sure we reanalyze if GlobalSubqueriesVisitor changed use_parallel_replicas
/// inside the query context (because it doesn't have write access to the main context)
UInt64 parallel_replicas_before_analysis
= context->hasQueryContext() ? context->getQueryContext()->getSettingsRef().allow_experimental_parallel_reading_from_replicas : 0;
= context->hasQueryContext() ? context->getQueryContext()->getSettingsRef().use_parallel_replicas : 0;
/// Conditionally support AST-based PREWHERE optimization.
analyze(shouldMoveToPrewhere() && (!settings.query_plan_optimize_prewhere || !settings.query_plan_enable_optimizations));
@ -829,10 +829,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (context->hasQueryContext())
{
/// As this query can't be executed with parallel replicas, we must reanalyze it
if (context->getQueryContext()->getSettingsRef().allow_experimental_parallel_reading_from_replicas
if (context->getQueryContext()->getSettingsRef().use_parallel_replicas
!= parallel_replicas_before_analysis)
{
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("use_parallel_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{1});
need_analyze_again = true;
}
@ -924,7 +924,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
if (getTrivialCount(0).has_value())
{
/// The query could use trivial count if it didn't use parallel replicas, so let's disable it and reanalyze
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("use_parallel_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{1});
LOG_DEBUG(log, "Disabling parallel replicas to be able to use a trivial count optimization");
return true;
@ -980,7 +980,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
if (number_of_replicas_to_use <= 1)
{
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("use_parallel_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{1});
LOG_DEBUG(log, "Disabling parallel replicas because there aren't enough rows to read");
return true;

View File

@ -1424,11 +1424,11 @@ void Planner::buildPlanForQueryNode()
{
if (!settings.parallel_replicas_allow_in_with_subquery && planner_context->getPreparedSets().hasSubqueries())
{
if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
if (settings.use_parallel_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("use_parallel_replicas", Field(0));
LOG_DEBUG(getLogger("Planner"), "Disabling parallel replicas to execute a query with IN with subquery");
}
}
@ -1463,7 +1463,7 @@ void Planner::buildPlanForQueryNode()
const auto & modifiers = table_node->getTableExpressionModifiers();
if (modifiers.has_value() && modifiers->hasFinal())
{
if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
if (settings.use_parallel_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
else
{
@ -1471,7 +1471,7 @@ void Planner::buildPlanForQueryNode()
getLogger("Planner"),
"FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("use_parallel_replicas", Field(0));
}
}
}
@ -1482,7 +1482,7 @@ void Planner::buildPlanForQueryNode()
/// Check support for JOIN for parallel replicas with custom key
if (planner_context->getTableExpressionNodeToData().size() > 1)
{
if (settings.allow_experimental_parallel_reading_from_replicas >= 2)
if (settings.use_parallel_replicas >= 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
else
{
@ -1491,7 +1491,7 @@ void Planner::buildPlanForQueryNode()
"JOINs are not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("use_parallel_replicas", Field(0));
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
}
}

View File

@ -292,11 +292,11 @@ bool applyTrivialCountIfPossible(
if (settings.max_parallel_replicas > 1)
{
if (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas == 0)
if (!settings.parallel_replicas_custom_key.value.empty() || settings.use_parallel_replicas == 0)
return false;
/// The query could use trivial count if it didn't use parallel replicas, so let's disable it
query_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
query_context->setSetting("use_parallel_replicas", Field(0));
query_context->setSetting("max_parallel_replicas", UInt64{1});
LOG_TRACE(getLogger("Planner"), "Disabling parallel replicas to be able to use a trivial count optimization");

View File

@ -270,7 +270,7 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr
/// This is needed to avoid infinite recursion.
auto mutable_context = Context::createCopy(context);
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("use_parallel_replicas", Field(0));
/// Here we replace tables to dummy, in order to build a temporary query plan for parallel replicas analysis.
ResultReplacementMap replacement_map;

View File

@ -243,7 +243,7 @@ function run_tests()
)
if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
TEST_ARGS+=(
--client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 --max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'"
--client="clickhouse-client --use_parallel_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 --max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'"
--no-parallel-replicas
)
fi

View File

@ -54,7 +54,7 @@ def _get_result_without_parallel_replicas(query):
return nodes[0].query(
query,
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"use_parallel_replicas": 0,
},
)
@ -65,7 +65,7 @@ def _get_result_with_parallel_replicas(
return nodes[0].query(
query,
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 6,
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,

View File

@ -85,7 +85,7 @@ def test_skip_unavailable_shards(start_cluster, prefer_localhost_replica):
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 3,
"prefer_localhost_replica": prefer_localhost_replica,
"skip_unavailable_shards": 1,
@ -119,7 +119,7 @@ def test_error_on_unavailable_shards(start_cluster, prefer_localhost_replica):
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 3,
"prefer_localhost_replica": prefer_localhost_replica,
"skip_unavailable_shards": 0,
@ -154,7 +154,7 @@ def test_no_unavailable_shards(start_cluster, skip_unavailable_shards):
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 3,
"prefer_localhost_replica": 0,
"skip_unavailable_shards": skip_unavailable_shards,

View File

@ -80,7 +80,7 @@ def _get_result_with_parallel_replicas(
return nodes[0].query(
query,
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": len(nodes),
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,

View File

@ -137,7 +137,7 @@ def test_parallel_replicas_over_distributed(
node.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": max_parallel_replicas,
},
@ -150,7 +150,7 @@ def test_parallel_replicas_over_distributed(
node.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"use_parallel_replicas": 0,
},
)
== expected_result

View File

@ -25,7 +25,7 @@ def test_skip_unavailable_shards(start_cluster):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"use_parallel_replicas": 0,
"skip_unavailable_shards": 1,
},
)
@ -36,7 +36,7 @@ def test_skip_unavailable_shards(start_cluster):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 3,
"skip_unavailable_shards": 1,
# "async_socket_for_remote" : 0,
@ -53,7 +53,7 @@ def test_error_on_unavailable_shards(start_cluster):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"use_parallel_replicas": 0,
"skip_unavailable_shards": 0,
},
)
@ -62,7 +62,7 @@ def test_error_on_unavailable_shards(start_cluster):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"use_parallel_replicas": 2,
"max_parallel_replicas": 3,
"skip_unavailable_shards": 0,
},

View File

@ -1,6 +1,6 @@
-- Tags: replica, distributed
SET allow_experimental_parallel_reading_from_replicas = 0;
SET use_parallel_replicas = 0;
SET max_parallel_replicas = 2;
DROP TABLE IF EXISTS report;

View File

@ -1,6 +1,6 @@
-- Tags: replica, distributed
set allow_experimental_parallel_reading_from_replicas=0;
set use_parallel_replicas=0;
drop table if exists test_max_parallel_replicas_lr;

View File

@ -1,6 +1,6 @@
-- Tags: distributed
set allow_experimental_parallel_reading_from_replicas = 0;
set use_parallel_replicas = 0;
drop table if exists sample_final;
create table sample_final (CounterID UInt32, EventDate Date, EventTime DateTime, UserID UInt64, Sign Int8) engine = CollapsingMergeTree(Sign) order by (CounterID, EventDate, intHash32(UserID), EventTime) sample by intHash32(UserID) SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';

View File

@ -2,7 +2,7 @@
-- set distributed_foreground_insert = 1; -- see https://github.com/ClickHouse/ClickHouse/issues/18971
SET allow_experimental_parallel_reading_from_replicas = 0; -- see https://github.com/ClickHouse/ClickHouse/issues/34525
SET use_parallel_replicas = 0; -- see https://github.com/ClickHouse/ClickHouse/issues/34525
SET prefer_localhost_replica = 1;
DROP TABLE IF EXISTS local_01099_a;

View File

@ -1,16 +1,16 @@
-- Tags: distributed
SET allow_experimental_parallel_reading_from_replicas = 0;
SET use_parallel_replicas = 0;
DROP TABLE IF EXISTS test5346;
CREATE TABLE test5346 (`Id` String, `Timestamp` DateTime, `updated` DateTime)
CREATE TABLE test5346 (`Id` String, `Timestamp` DateTime, `updated` DateTime)
ENGINE = ReplacingMergeTree(updated) PARTITION BY tuple() ORDER BY (Timestamp, Id);
INSERT INTO test5346 VALUES('1',toDateTime('2020-01-01 00:00:00'),toDateTime('2020-01-01 00:00:00'));
SELECT Id, Timestamp
FROM remote('localhost,127.0.0.1,127.0.0.2',currentDatabase(),'test5346') FINAL
SELECT Id, Timestamp
FROM remote('localhost,127.0.0.1,127.0.0.2',currentDatabase(),'test5346') FINAL
ORDER BY Timestamp;
SELECT Id, Timestamp

View File

@ -1,6 +1,6 @@
-- Tags: replica
SET allow_experimental_parallel_reading_from_replicas=0;
SET use_parallel_replicas=0;
DROP TABLE IF EXISTS t;
CREATE TABLE t (x String) ENGINE = MergeTree ORDER BY x;

View File

@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 -m < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
${CLICKHOUSE_CLIENT} --use_parallel_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 -m < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null

View File

@ -14,7 +14,7 @@ as select * from numbers(1);
#
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
opts=(
--allow_experimental_parallel_reading_from_replicas 1
--use_parallel_replicas 1
--parallel_replicas_for_non_replicated_merge_tree 1
--max_parallel_replicas 3
--cluster_for_parallel_replicas parallel_replicas

View File

@ -63,7 +63,7 @@ drop table if exists pr_t;
create table pr_t(a UInt64, b UInt64) engine=MergeTree order by a;
insert into pr_t select number % 1000, number % 1000 from numbers_mt(1e6);
set allow_experimental_parallel_reading_from_replicas = 1;
set use_parallel_replicas = 1;
set parallel_replicas_for_non_replicated_merge_tree = 1;
set max_parallel_replicas = 3;
set cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';

View File

@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS t_02708(x DateTime) ENGINE = MergeTree ORDER BY tuple();
SET send_logs_level='error';
SELECT count() FROM t_02708 SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT count() FROM t_02708 SETTINGS use_parallel_replicas=1;
DROP TABLE t_02708;

View File

@ -33,8 +33,8 @@
=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============
0 2 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2`, toUInt64(min(`__table1`.`time`)) AS `start_ts` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2` ORDER BY `__table1`.`key` ASC, `__table1`.`value1` ASC, `__table1`.`value2` ASC LIMIT _CAST(10, \'UInt64\')
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2` ORDER BY `key` ASC, `value1` ASC, `value2` ASC LIMIT 10
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=0;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=1;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS use_parallel_replicas = 1, enable_analyzer=0;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS use_parallel_replicas = 1, enable_analyzer=1;
=============== OUTER QUERY (NO PARALLEL) ===============
>T%O ,z< 10
NQTpY# W\\Xx4 10
@ -61,5 +61,5 @@ t<iT X48q:Z]t0 10
0 2 SELECT `__table2`.`value1` AS `value1`, `__table2`.`value2` AS `value2`, count() AS `count` FROM `default`.`join_outer_table` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`key` AS `key`, `__table3`.`value1` AS `value1`, `__table3`.`value2` AS `value2` FROM `default`.`join_inner_table` AS `__table3` PREWHERE (`__table3`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table3`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table3`.`key`, `__table3`.`value1`, `__table3`.`value2`) AS `__table2` USING (`key`) GROUP BY `__table1`.`key`, `__table2`.`value1`, `__table2`.`value2`
0 3 SELECT `key`, `value1`, `value2` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=0;
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=1;
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS use_parallel_replicas = 1, enable_analyzer=0;
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS use_parallel_replicas = 1, enable_analyzer=1;

View File

@ -64,7 +64,7 @@ PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=0;
SETTINGS use_parallel_replicas = 1, enable_analyzer=0;
-- Parallel inner query alone
SELECT
@ -77,7 +77,7 @@ PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=1;
SETTINGS use_parallel_replicas = 1, enable_analyzer=1;
SELECT '=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============';
@ -184,7 +184,7 @@ FROM
)
GROUP BY value1, value2
ORDER BY value1, value2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=0;
SETTINGS use_parallel_replicas = 1, enable_analyzer=0;
-- Parallel full query
SELECT
@ -214,7 +214,7 @@ FROM
)
GROUP BY value1, value2
ORDER BY value1, value2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, enable_analyzer=1;
SETTINGS use_parallel_replicas = 1, enable_analyzer=1;
SYSTEM FLUSH LOGS;

View File

@ -18,7 +18,7 @@ INSERT INTO join_inner_table__fuzz_1 SELECT
FROM generateRandom('number Int64, value1 String, value2 String, time Int64', 1, 10, 2)
LIMIT 100;
SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree=1;
SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree=1;
-- SELECT query will write a Warning to the logs
SET send_logs_level='error';

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS parallel_replicas_plain;
CREATE TABLE parallel_replicas_plain (x String) ENGINE=MergeTree() ORDER BY x;
INSERT INTO parallel_replicas_plain SELECT toString(number) FROM numbers(10);
SET max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, cluster_for_parallel_replicas='parallel_replicas';
SET max_parallel_replicas=3, use_parallel_replicas=1, cluster_for_parallel_replicas='parallel_replicas';
SET send_logs_level='error';
SET parallel_replicas_for_non_replicated_merge_tree = 0;

View File

@ -2,12 +2,12 @@ CREATE TABLE IF NOT EXISTS parallel_replicas_final (x String) ENGINE=ReplacingMe
INSERT INTO parallel_replicas_final SELECT toString(number) FROM numbers(10);
SET max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, cluster_for_parallel_replicas='parallel_replicas';
SET max_parallel_replicas=3, use_parallel_replicas=1, cluster_for_parallel_replicas='parallel_replicas';
SET parallel_replicas_for_non_replicated_merge_tree = 1;
SELECT * FROM parallel_replicas_final FINAL FORMAT Null;
SET allow_experimental_parallel_reading_from_replicas=2;
SET use_parallel_replicas=2;
SELECT * FROM parallel_replicas_final FINAL FORMAT Null; -- { serverError SUPPORT_IS_DISABLED }

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards;
CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=11, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET use_parallel_replicas=2, max_parallel_replicas=11, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET send_logs_level='error';
-- with local plan for initiator, the query can be executed fast on initator, we can simply not come to the point where unavailable replica can be detected
-- therefore disable local plan for now
@ -10,7 +10,7 @@ SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*
SYSTEM FLUSH LOGS;
SET allow_experimental_parallel_reading_from_replicas=0;
SET use_parallel_replicas=0;
SELECT ProfileEvents['ParallelReplicasUnavailableCount'] FROM system.query_log WHERE yesterday() <= event_date AND query_id in (select query_id from system.query_log where log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79' and current_database = currentDatabase()) and type = 'QueryFinish' and query_id == initial_query_id;
DROP TABLE test_parallel_replicas_unavailable_shards;

View File

@ -8,5 +8,10 @@
5935810273536892891
7885388429666205427
8124171311239967992
<<<<<<< HEAD
1 1 -- Simple query with analyzer and pure parallel replicas\nSELECT number\nFROM join_inner_table__fuzz_146_replicated\n SETTINGS\n enable_analyzer = 1,\n max_parallel_replicas = 2,\n cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\',\n allow_experimental_parallel_reading_from_replicas = 1;
0 1 SELECT `__table1`.`number` AS `number` FROM `default`.`join_inner_table__fuzz_146_replicated` AS `__table1`
=======
1 1 -- Simple query with analyzer and pure parallel replicas\nSELECT number\nFROM join_inner_table__fuzz_146_replicated\n SETTINGS\n enable_analyzer = 1,\n max_parallel_replicas = 2,\n cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\',\n use_parallel_replicas = 1;
0 2 SELECT `__table1`.`number` AS `number` FROM `default`.`join_inner_table__fuzz_146_replicated` AS `__table1`
>>>>>>> Parallel replicas feature is Beta

View File

@ -26,7 +26,7 @@ FROM join_inner_table__fuzz_146_replicated
enable_analyzer = 1,
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 1;
use_parallel_replicas = 1;
SYSTEM FLUSH LOGS;
-- There should be 2 different queries

View File

@ -33,7 +33,7 @@ function run_query_with_pure_parallel_replicas () {
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas 'test_cluster_one_shard_three_replicas_localhost' \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--enable_analyzer 0
$CLICKHOUSE_CLIENT \
@ -42,7 +42,7 @@ function run_query_with_pure_parallel_replicas () {
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas 'test_cluster_one_shard_three_replicas_localhost' \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--enable_analyzer 1
}

View File

@ -50,7 +50,7 @@ function run_query_with_pure_parallel_replicas () {
--query_id "${1}_pure" \
--max_parallel_replicas 3 \
--cluster_for_parallel_replicas "parallel_replicas" \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica "$2" \
--max_threads 5 \

View File

@ -67,7 +67,7 @@ function run_query_with_pure_parallel_replicas () {
--prefer_localhost_replica 1 \
--parallel_replicas_prefer_local_join 0 \
--cluster_for_parallel_replicas "parallel_replicas" \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica "$2" \
|& grep "It is enough work for" | awk '{ print substr($7, 2, length($7) - 2) "\t" $20 " estimated parallel replicas" }' | sort -n -k2 -b | grep -Pv "\t0 estimated parallel replicas"

View File

@ -10,7 +10,8 @@ SELECT count() FROM users PREWHERE uid > 2000;
-- enable parallel replicas but with high rows threshold
SET
allow_experimental_parallel_reading_from_replicas=1,
skip_unavailable_shards=1,
use_parallel_replicas=1,
max_parallel_replicas=3,
cluster_for_parallel_replicas='parallel_replicas',
parallel_replicas_for_non_replicated_merge_tree=1,

View File

@ -14,13 +14,13 @@ insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
-- 2 shards
@ -38,10 +38,10 @@ insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;

View File

@ -35,7 +35,7 @@ echo "
SETTINGS
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 2,
use_parallel_replicas = 2,
parallel_replicas_for_non_replicated_merge_tree = 1,
interactive_delay=0,
parallel_replicas_local_plan=0
@ -50,7 +50,7 @@ echo "
SETTINGS
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 2,
use_parallel_replicas = 2,
parallel_replicas_for_non_replicated_merge_tree = 1,
interactive_delay=99999999999,
parallel_replicas_local_plan=0

View File

@ -2,5 +2,5 @@
DROP TABLE IF EXISTS set_index__fuzz_41;
CREATE TABLE set_index__fuzz_41 (`a` Date, `b` Nullable(DateTime64(3)), INDEX b_set b TYPE set(0) GRANULARITY 1) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO set_index__fuzz_41 (a) VALUES (today());
SELECT b FROM set_index__fuzz_41 WHERE and(b = 256) SETTINGS force_data_skipping_indices = 'b_set', optimize_move_to_prewhere = 0, max_parallel_replicas=2, parallel_replicas_for_non_replicated_merge_tree=1, allow_experimental_parallel_reading_from_replicas=2; -- { serverError TOO_FEW_ARGUMENTS_FOR_FUNCTION }
SELECT b FROM set_index__fuzz_41 WHERE and(b = 256) SETTINGS force_data_skipping_indices = 'b_set', optimize_move_to_prewhere = 0, max_parallel_replicas=2, parallel_replicas_for_non_replicated_merge_tree=1, use_parallel_replicas=2; -- { serverError TOO_FEW_ARGUMENTS_FOR_FUNCTION }
DROP TABLE set_index__fuzz_41;

View File

@ -24,5 +24,5 @@ system sync replica t3;
SELECT count(), min(k), max(k), avg(k)
FROM t1
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0,
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0,
cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_single_task_marks_count_multiplier = 0.001;

View File

@ -2,12 +2,12 @@ DROP TABLE IF EXISTS tt;
CREATE TABLE tt (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO tt SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SET use_parallel_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SELECT count() FROM clusterAllReplicas('test_cluster_two_shard_three_replicas_localhost', currentDatabase(), tt) settings log_comment='02875_190aed82-2423-413b-ad4c-24dcca50f65b';
SYSTEM FLUSH LOGS;
SET allow_experimental_parallel_reading_from_replicas=0;
SET use_parallel_replicas=0;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment = '02875_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Parallel reading from replicas is disabled for cluster%';

View File

@ -2,12 +2,12 @@ DROP TABLE IF EXISTS tt;
CREATE TABLE tt (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO tt SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SET use_parallel_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SELECT count() FROM remote('127.0.0.{1..6}', currentDatabase(), tt) settings log_comment='02875_89f3c39b-1919-48cb-b66e-ef9904e73146';
SYSTEM FLUSH LOGS;
SET allow_experimental_parallel_reading_from_replicas=0;
SET use_parallel_replicas=0;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment = '02875_89f3c39b-1919-48cb-b66e-ef9904e73146')
AND message LIKE '%Parallel reading from replicas is disabled for cluster%';

View File

@ -14,7 +14,7 @@ system sync replica t1;
system sync replica t2;
system sync replica t3;
SET allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
SET use_parallel_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
-- default coordinator
SELECT count(), min(k), max(k), avg(k) FROM t1 SETTINGS log_comment='02898_default_190aed82-2423-413b-ad4c-24dcca50f65b';
@ -23,7 +23,7 @@ SELECT count(), min(k), max(k), avg(k) FROM t1 SETTINGS log_comment='02898_defau
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_default_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
AND message LIKE '%Total rows to read: 3000%' SETTINGS use_parallel_replicas=0;
-- reading in order coordinator
-- disable parallel_replicas_local_plan since the test relay on traces which only present in case of no local plan
@ -32,7 +32,7 @@ SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_r
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS use_parallel_replicas=0;
DROP TABLE t1 SYNC;
DROP TABLE t2 SYNC;

View File

@ -30,7 +30,7 @@ $CLICKHOUSE_CLIENT \
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas "test_cluster_one_shard_three_replicas_localhost" \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica 0 \
--query "
@ -68,7 +68,7 @@ $CLICKHOUSE_CLIENT \
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--cluster_for_parallel_replicas "test_cluster_one_shard_three_replicas_localhost" \
--allow_experimental_parallel_reading_from_replicas 1 \
--use_parallel_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica 0 \
--query "SELECT * FROM (SELECT year, month, day, count(*) FROM days GROUP BY year, month, day WITH ROLLUP) ORDER BY 1, 2, 3";

View File

@ -88,7 +88,7 @@ CREATE TABLE t1 (`n` UInt64) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO t1 SELECT * FROM numbers(10);
SET
allow_experimental_parallel_reading_from_replicas=1,
use_parallel_replicas=1,
max_parallel_replicas=2,
use_hedged_requests=0,
cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost',

View File

@ -9,7 +9,7 @@ SELECT key, value1, value2, toUInt64(min(time)) AS start_ts FROM join_inner_tabl
max_parallel_replicas = 3,
prefer_localhost_replica = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 1,
use_parallel_replicas = 1,
use_hedged_requests = 0;
@ -22,7 +22,7 @@ ORDER BY nan DESC, [0, NULL, NULL, NULL, NULL] DESC
FORMAT Null
SETTINGS
max_parallel_replicas = 3,
allow_experimental_parallel_reading_from_replicas = 1,
use_parallel_replicas = 1,
use_hedged_requests = 0,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
@ -35,4 +35,4 @@ ORDER BY
nan DESC,
_CAST([0, NULL, NULL, NULL, NULL], 'Array(Nullable(UInt8))') DESC
FORMAT Null
SETTINGS receive_timeout = 10., receive_data_timeout_ms = 10000, use_hedged_requests = 0, allow_suspicious_low_cardinality_types = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, log_queries = 1, table_function_remote_max_addresses = 200, enable_analyzer = 1;
SETTINGS receive_timeout = 10., receive_data_timeout_ms = 10000, use_hedged_requests = 0, allow_suspicious_low_cardinality_types = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, log_queries = 1, table_function_remote_max_addresses = 200, enable_analyzer = 1;

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test_parallel_replicas_settings;
CREATE TABLE test_parallel_replicas_settings (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_parallel_replicas_settings SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SET use_parallel_replicas=2, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1;
SET cluster_for_parallel_replicas='';
SELECT count() FROM test_parallel_replicas_settings WHERE NOT ignore(*); -- { serverError CLUSTER_DOESNT_EXIST }
@ -16,20 +16,20 @@ SELECT count() > 0 FROM system.text_log
WHERE yesterday() <= event_date
AND query_id in (select query_id from system.query_log where current_database=currentDatabase() AND log_comment='0_f621c4f2-4da7-4a7c-bb6d-052c442d0f7f')
AND level = 'Information'
AND message ILIKE '%Disabling ''use_hedged_requests'' in favor of ''allow_experimental_parallel_reading_from_replicas''%'
SETTINGS allow_experimental_parallel_reading_from_replicas=0;
AND message ILIKE '%Disabling ''use_hedged_requests'' in favor of ''use_parallel_replicas''%'
SETTINGS use_parallel_replicas=0;
SET use_hedged_requests=1;
SELECT count() FROM test_parallel_replicas_settings WHERE NOT ignore(*) settings log_comment='1_f621c4f2-4da7-4a7c-bb6d-052c442d0f7f';
SYSTEM FLUSH LOGS;
SET allow_experimental_parallel_reading_from_replicas=0;
SET use_parallel_replicas=0;
SELECT count() > 0 FROM system.text_log
WHERE yesterday() <= event_date
AND query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment = '1_f621c4f2-4da7-4a7c-bb6d-052c442d0f7f')
AND level = 'Warning'
AND message ILIKE '%Setting ''use_hedged_requests'' explicitly with enabled ''allow_experimental_parallel_reading_from_replicas'' has no effect%'
SETTINGS allow_experimental_parallel_reading_from_replicas=0;
AND message ILIKE '%Setting ''use_hedged_requests'' explicitly with enabled ''use_parallel_replicas'' has no effect%'
SETTINGS use_parallel_replicas=0;
DROP TABLE test_parallel_replicas_settings;

View File

@ -11,7 +11,7 @@ ENGINE = Distributed(test_cluster_one_shard_three_replicas_localhost, currentDat
SELECT count(), sum(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1;
DROP TABLE test_d;
DROP TABLE test;

View File

@ -22,10 +22,10 @@ FROM t1
WHERE k > 0
GROUP BY k
ORDER BY k
SETTINGS force_primary_key = 1, allow_experimental_parallel_reading_from_replicas = 0;
SETTINGS force_primary_key = 1, use_parallel_replicas = 0;
-- parallel replicas, primary key is used
SET allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
SET use_parallel_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
SELECT
k,
count()

View File

@ -7,6 +7,6 @@ AS select *, '2023-12-25' from numbers(100);
SELECT count(), sum(id)
FROM remote('127.0.0.1|127.0.0.2|127.0.0.3|127.0.0.4', currentDatabase(), test)
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 4, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1; -- { serverError CLUSTER_DOESNT_EXIST }
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 4, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1; -- { serverError CLUSTER_DOESNT_EXIST }
DROP TABLE test;

View File

@ -8,23 +8,23 @@ INSERT INTO merge_tree_in_subqueries VALUES(5, 'test5', 0);
SET max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED }
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED }
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 0) SETTINGS use_parallel_replicas=1;
SELECT '---';
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT * FROM system.numbers LIMIT 2, 3) ORDER BY id SETTINGS use_parallel_replicas=1;
SELECT '---';
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS allow_experimental_parallel_reading_from_replicas=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=0; -- { serverError SUPPORT_IS_DISABLED };
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT * FROM merge_tree_in_subqueries WHERE id IN (SELECT 1) ORDER BY id SETTINGS use_parallel_replicas=1;
-- IN with tuples is allowed
SELECT '---';
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=0;
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS allow_experimental_parallel_reading_from_replicas=2, parallel_replicas_allow_in_with_subquery=1;
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=0;
SELECT id, name FROM merge_tree_in_subqueries WHERE (id, name) IN (3, 'test3') SETTINGS use_parallel_replicas=2, parallel_replicas_allow_in_with_subquery=1;
DROP TABLE IF EXISTS merge_tree_in_subqueries;

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test;
CREATE TABLE test (x UInt8) ENGINE = MergeTree ORDER BY x;
INSERT INTO test VALUES (1), (2), (3);
SET allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1;
SET use_parallel_replicas = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree = 1;
WITH (SELECT '111111111111111111111111111111111111111'::UInt128) AS v SELECT sum(x), max(v) FROM test;

View File

@ -7,7 +7,7 @@ SETTINGS index_granularity=1;
INSERT INTO test SELECT number, toString(number) FROM numbers(10_000);
SET allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
SET use_parallel_replicas = 2, max_parallel_replicas = 3, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
-- default coordinator
SELECT count(), sum(k)
@ -15,18 +15,18 @@ FROM test
SETTINGS log_comment = '02950_parallel_replicas_used_replicas_count';
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS use_parallel_replicas=0;
-- In order coordinator
SELECT k FROM test order by k limit 5 offset 89 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_2', merge_tree_min_rows_for_concurrent_read=1, max_threads=1;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_2' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_2' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS use_parallel_replicas=0;
-- In reverse order coordinator
SELECT k FROM test order by k desc limit 5 offset 9906 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_3', merge_tree_min_rows_for_concurrent_read=1, max_threads=1;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_3' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0;
SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_3' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS use_parallel_replicas=0;
DROP TABLE test;

View File

@ -17,7 +17,7 @@ insert into num_1 select number * 2, toString(number * 2) from numbers(1e7);
insert into num_2 select number * 3, -number from numbers(1.5e6);
"
PARALLEL_REPLICAS_SETTINGS="allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_local_plan=1"
PARALLEL_REPLICAS_SETTINGS="use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_local_plan=1"
##############
echo
@ -30,6 +30,7 @@ inner join (select key, value from num_2 inner join
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS enable_analyzer=1, $PARALLEL_REPLICAS_SETTINGS, parallel_replicas_prefer_local_join=0"
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join 0 -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join

View File

@ -2,7 +2,7 @@
set parallel_replicas_prefer_local_join = 0;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -18,7 +18,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Union
Expression
Join
@ -33,6 +33,7 @@ Union
ReadFromMemoryStorage
Expression
ReadFromRemoteParallelReplicas
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
@ -41,7 +42,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -64,7 +65,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Union
@ -91,7 +92,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
@ -101,7 +102,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -130,7 +131,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -153,7 +154,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -182,7 +183,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -205,7 +206,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -238,7 +239,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
@ -261,7 +262,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Join
Expression
@ -291,7 +292,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
@ -313,7 +314,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -346,7 +347,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -369,7 +370,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Union
@ -402,7 +403,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -425,7 +426,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;-- { echoOn }
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;-- { echoOn }
Expression
Sorting
Expression
@ -455,7 +456,7 @@ Expression
ReadFromRemoteParallelReplicas
set parallel_replicas_prefer_local_join = 1;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -471,6 +472,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
<<<<<<< HEAD
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Union
Expression
@ -488,6 +490,11 @@ Union
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
=======
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
ReadFromRemoteParallelReplicas
>>>>>>> Parallel replicas feature is Beta
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
@ -496,7 +503,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -519,7 +526,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Union
@ -548,7 +555,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
@ -558,7 +565,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -589,7 +596,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -612,7 +619,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -642,7 +649,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -665,7 +672,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -698,7 +705,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
@ -721,7 +728,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Join
Expression
@ -751,7 +758,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
@ -773,7 +780,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Expression
@ -806,7 +813,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -829,7 +836,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Sorting
Union
@ -864,7 +871,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -887,7 +894,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
Expression
Sorting
Expression

View File

@ -19,8 +19,8 @@ set parallel_replicas_local_plan=1;
set parallel_replicas_prefer_local_join = {{use_global_in}};
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
@ -29,7 +29,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -38,7 +38,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- GROUP BY should work up to WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -47,7 +47,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -56,7 +56,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -65,7 +65,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -74,7 +74,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- ORDER BY in sub1 : sub1 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2 order by y),
@ -83,7 +83,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2 order by y),
@ -92,7 +92,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- RIGHT JOIN in sub3: sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -101,7 +101,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -110,7 +110,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -118,7 +118,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -126,7 +126,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- Subqueries for IN allowed
@ -136,7 +136,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -145,7 +145,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
SETTINGS use_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
--
-- Subqueries for IN are not allowed
@ -155,7 +155,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -164,6 +164,6 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS use_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
{%- endfor %}

View File

@ -13,21 +13,21 @@ SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 100)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
SETTINGS use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
-- Testing that it is disabled for enable_analyzer=0. With analyzer it will be supported (with correct result)
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 100)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS enable_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
SETTINGS enable_analyzer = 0, use_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
-- Disabled for any value of allow_experimental_parallel_reading_from_replicas != 1, not just 2
-- Disabled for any value of use_parallel_replicas != 1, not just 2
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 100)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS enable_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 512, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
SETTINGS enable_analyzer = 0, use_parallel_replicas = 512, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
-- Sanitizer
SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
SETTINGS use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
-- Parallel replicas detection should work inside subqueries
SELECT *
@ -36,7 +36,7 @@ FROM
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 100)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
)
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
SETTINGS use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
-- Subquery + subquery
SELECT count()
@ -49,7 +49,7 @@ FROM
SELECT count() as c FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
)
)
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
SETTINGS use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
CREATE TABLE numbers_1e3
(
@ -74,7 +74,7 @@ WITH
)
SELECT count()
FROM cte2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
SETTINGS use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
DROP TABLE IF EXISTS numbers_1e6;
DROP TABLE IF EXISTS pr_1;

View File

@ -49,7 +49,7 @@ ${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" --parallel_distr
""" | grep -v EmptySink | grep -c Sink
echo "inserting into a remote table from remote (reading with parallel replicas) with concurrency max_insert_threads"
${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" --allow_experimental_parallel_reading_from_replicas 2 --cluster_for_parallel_replicas 'parallel_replicas' --max_parallel_replicas 3 -q """
${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" --use_parallel_replicas 2 --cluster_for_parallel_replicas 'parallel_replicas' --max_parallel_replicas 3 -q """
EXPLAIN PIPELINE
INSERT INTO t3_dist
SELECT * FROM t4_pr;

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test_unexpected_cluster;
CREATE TABLE test_unexpected_cluster (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_unexpected_cluster SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=2, cluster_for_parallel_replicas='test_cluster_two_shards', parallel_replicas_for_non_replicated_merge_tree=1;
SET use_parallel_replicas=2, max_parallel_replicas=2, cluster_for_parallel_replicas='test_cluster_two_shards', parallel_replicas_for_non_replicated_merge_tree=1;
SELECT count() FROM test_unexpected_cluster WHERE NOT ignore(*); -- { serverError UNEXPECTED_CLUSTER }
DROP TABLE test_unexpected_cluster;

View File

@ -21,7 +21,7 @@ AS SELECT *
FROM numbers(10);
SET enable_analyzer = 1;
SET allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3, parallel_replicas_min_number_of_rows_per_replica=0;
SET use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3, parallel_replicas_min_number_of_rows_per_replica=0;
EXPLAIN SYNTAX
WITH

View File

@ -21,7 +21,7 @@ SELECT
FROM numbers(130000)
SETTINGS max_insert_block_size = 200000;
SET max_block_size = 1048576, max_threads = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3, parallel_replicas_min_number_of_rows_per_replica=10000;
SET max_block_size = 1048576, max_threads = 1, use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3, parallel_replicas_min_number_of_rows_per_replica=10000;
EXPLAIN ESTIMATE
SELECT count()

View File

@ -3,7 +3,7 @@ set max_threads = 16;
set use_hedged_requests = 0;
set max_parallel_replicas = 3;
set cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
set allow_experimental_parallel_reading_from_replicas = 1;
set use_parallel_replicas = 1;
set parallel_replicas_for_non_replicated_merge_tree = 1;
set allow_aggregate_partitions_independently = 1;

View File

@ -31,7 +31,7 @@ test1() {
GROUP BY CounterID, URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3"
check_replicas_read_in_order $query_id
}
@ -48,7 +48,7 @@ test2() {
GROUP BY URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, query_plan_aggregation_in_order = 1"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, query_plan_aggregation_in_order = 1"
check_replicas_read_in_order $query_id
}
@ -64,7 +64,7 @@ test3() {
FROM test.hits
WHERE CounterID = 1704509 AND UserID = 4322253409885123546
GROUP BY URL, EventDate
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, parallel_replicas_local_plan=1
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, use_parallel_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, parallel_replicas_local_plan=1
)
WHERE explain LIKE '%Aggr%Transform%' OR explain LIKE '%InOrder%'"
}