Pure parallel replicas: JOIN support (#49544)

This commit is contained in:
Raúl Marín 2023-05-26 00:04:24 +01:00 committed by GitHub
parent a014b4d06f
commit b3a96de533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 312 additions and 20 deletions

View File

@ -969,6 +969,15 @@ const ASTSelectQuery * ExpressionAnalyzer::getSelectQuery() const
return select_query;
}
bool ExpressionAnalyzer::isRemoteStorage() const
{
const Settings & csettings = getContext()->getSettingsRef();
// Consider any storage used in parallel replicas as remote, so the query is executed in multiple servers
const bool enable_parallel_processing_of_joins
= csettings.max_parallel_replicas > 1 && csettings.allow_experimental_parallel_reading_from_replicas > 0;
return syntax->is_remote_storage || enable_parallel_processing_of_joins;
}
const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() const
{
if (!has_aggregation)

View File

@ -201,7 +201,7 @@ protected:
const ASTSelectQuery * getSelectQuery() const;
bool isRemoteStorage() const { return syntax->is_remote_storage; }
bool isRemoteStorage() const;
NamesAndTypesList getColumnsAfterArrayJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);
NamesAndTypesList analyzeJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);

View File

@ -205,10 +205,19 @@ public:
}
private:
static bool shouldBeExecutedGlobally(const Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
/// For parallel replicas we reinterpret JOIN as GLOBAL JOIN as a way to broadcast data
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
return settings.prefer_global_in_and_join || enable_parallel_processing_of_joins;
}
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if ((data.getContext()->getSettingsRef().prefer_global_in_and_join
if ((shouldBeExecutedGlobally(data)
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
@ -238,8 +247,7 @@ private:
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global
|| data.getContext()->getSettingsRef().prefer_global_in_and_join))
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
{
data.addExternalStorage(table_elem.table_expression, true);
data.has_global_subqueries = true;

View File

@ -458,19 +458,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
/// Check support for JOINs for parallel replicas
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "JOINs are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
LOG_WARNING(log, "JOINs are not supported with parallel_replicas_custom_key. Query will be executed without using them.");
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Check support for FINAL for parallel replicas
@ -489,6 +481,21 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
/// Check support for parallel replicas for non-replicated storage (plain MergeTree)
bool is_plain_merge_tree = storage && storage->isMergeTree() && !storage->supportsReplication();
if (is_plain_merge_tree && settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.parallel_replicas_for_non_replicated_merge_tree)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`. For now query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`");
}
}
/// Rewrite JOINs
if (!has_input && joined_tables.tablesCount() > 1)
{

View File

@ -112,8 +112,6 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
subquery_options.removeDuplicates();
}
/// We don't want to execute reading for subqueries in parallel
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return std::make_shared<InterpreterSelectWithUnionQuery>(query, subquery_context, subquery_options, required_source_columns);
}

View File

@ -41,6 +41,6 @@ run_count_with_custom_key "y"
run_count_with_custom_key "cityHash64(y)"
run_count_with_custom_key "cityHash64(y) + 1"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) as t1 JOIN 02535_custom_key USING y" --parallel_replicas_custom_key="y" --send_logs_level="trace" 2>&1 | grep -Fac "JOINs are not supported with parallel replicas"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) as t1 JOIN 02535_custom_key USING y" --parallel_replicas_custom_key="y" --send_logs_level="trace" 2>&1 | grep -Fac "JOINs are not supported with"
$CLICKHOUSE_CLIENT --query="DROP TABLE 02535_custom_key"

View File

@ -1,3 +1,4 @@
CREATE TABLE IF NOT EXISTS t_02708(x DateTime) ENGINE = MergeTree ORDER BY tuple();
SET send_logs_level='error';
SELECT count() FROM t_02708 SETTINGS allow_experimental_parallel_reading_from_replicas=1;
DROP TABLE t_02708;

View File

@ -0,0 +1,44 @@
=============== INNER QUERY (NO PARALLEL) ===============
0 PJFiUe#J2O _s\' 14427935816175499794
1 >T%O ,z< 17537932797009027240
12 D[6,P #}Lmb[ ZzU 6394957109822140795
18 $_N- 24422838680427462
2 bX?}ix [ Ny]2 G 16242612901291874718
20 VE] Y 15120036904703536841
22 Ti~3)N)< A!( 3 18361093572663329113
23 Sx>b:^UG XpedE)Q: 7433019734386307503
29 2j&S)ba?XG QuQj 17163829389637435056
3 UlI+1 14144472852965836438
=============== INNER QUERY (PARALLEL) ===============
0 PJFiUe#J2O _s\' 14427935816175499794
1 >T%O ,z< 17537932797009027240
12 D[6,P #}Lmb[ ZzU 6394957109822140795
18 $_N- 24422838680427462
2 bX?}ix [ Ny]2 G 16242612901291874718
20 VE] Y 15120036904703536841
22 Ti~3)N)< A!( 3 18361093572663329113
23 Sx>b:^UG XpedE)Q: 7433019734386307503
29 2j&S)ba?XG QuQj 17163829389637435056
3 UlI+1 14144472852965836438
=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2` ORDER BY `key` ASC, `value1` ASC, `value2` ASC LIMIT 10
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;
=============== OUTER QUERY (NO PARALLEL) ===============
>T%O ,z< 10
NQTpY# W\\Xx4 10
PJFiUe#J2O _s\' 10
U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
=============== OUTER QUERY (PARALLEL) ===============
>T%O ,z< 10
NQTpY# W\\Xx4 10
PJFiUe#J2O _s\' 10
U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_11888098645495698704_17868075224240210014` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;

View File

@ -0,0 +1,182 @@
-- Tags: zookeeper
CREATE TABLE join_inner_table
(
id UUID,
key String,
number Int64,
value1 String,
value2 String,
time Int64
)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/join_inner_table', 'r1')
ORDER BY (id, number, key);
INSERT INTO join_inner_table
SELECT
'833c9e22-c245-4eb5-8745-117a9a1f26b1'::UUID as id,
rowNumberInAllBlocks()::String as key,
* FROM generateRandom('number Int64, value1 String, value2 String, time Int64', 1, 10, 2)
LIMIT 100;
SET allow_experimental_analyzer = 0;
SET max_parallel_replicas = 3;
SET prefer_localhost_replica = 1;
SET cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SET use_hedged_requests = 0;
SET joined_subquery_requires_alias = 0;
SELECT '=============== INNER QUERY (NO PARALLEL) ===============';
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10;
SELECT '=============== INNER QUERY (PARALLEL) ===============';
-- Parallel inner query alone
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas = 1;
SELECT '=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============';
SYSTEM FLUSH LOGS;
-- There should be 4 queries. The main query as received by the initiator and the 3 equal queries sent to each replica
SELECT is_initial_query, count() as c, query,
FROM system.query_log
WHERE
event_date >= yesterday()
AND type = 'QueryFinish'
AND initial_query_id =
(
SELECT query_id
FROM system.query_log
WHERE
current_database = currentDatabase()
AND event_date >= yesterday()
AND type = 'QueryFinish'
AND query LIKE '-- Parallel inner query alone%'
)
GROUP BY is_initial_query, query
ORDER BY is_initial_query, c, query;
---- Query with JOIN
CREATE TABLE join_outer_table
(
id UUID,
key String,
otherValue1 String,
otherValue2 String,
time Int64
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/join_outer_table', 'r1')
ORDER BY (id, time, key);
INSERT INTO join_outer_table
SELECT
'833c9e22-c245-4eb5-8745-117a9a1f26b1'::UUID as id,
(rowNumberInAllBlocks() % 10)::String as key,
* FROM generateRandom('otherValue1 String, otherValue2 String, time Int64', 1, 10, 2)
LIMIT 100;
SELECT '=============== OUTER QUERY (NO PARALLEL) ===============';
SELECT
value1,
value2,
avg(count) AS avg
FROM
(
SELECT
key,
value1,
value2,
count() AS count
FROM join_outer_table
INNER JOIN
(
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
) USING (key)
GROUP BY key, value1, value2
)
GROUP BY value1, value2
ORDER BY value1, value2;
SELECT '=============== OUTER QUERY (PARALLEL) ===============';
-- Parallel full query
SELECT
value1,
value2,
avg(count) AS avg
FROM
(
SELECT
key,
value1,
value2,
count() AS count
FROM join_outer_table
INNER JOIN
(
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
) USING (key)
GROUP BY key, value1, value2
)
GROUP BY value1, value2
ORDER BY value1, value2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1;
SYSTEM FLUSH LOGS;
-- There should be 7 queries. The main query as received by the initiator, the 3 equal queries to execute the subquery
-- in the inner join and the 3 queries executing the whole query (but replacing the subquery with a temp table)
SELECT is_initial_query, count() as c, query,
FROM system.query_log
WHERE
event_date >= yesterday()
AND type = 'QueryFinish'
AND initial_query_id =
(
SELECT query_id
FROM system.query_log
WHERE
current_database = currentDatabase()
AND event_date >= yesterday()
AND type = 'QueryFinish'
AND query LIKE '-- Parallel full query%'
)
GROUP BY is_initial_query, query
ORDER BY is_initial_query, c, query;

View File

@ -0,0 +1,43 @@
CREATE TABLE join_inner_table__fuzz_1
(
`id` UUID,
`key` Nullable(Date),
`number` Int64,
`value1` LowCardinality(String),
`value2` LowCardinality(String),
`time` Int128
)
ENGINE = MergeTree
ORDER BY (id, number, key)
SETTINGS allow_nullable_key = 1;
INSERT INTO join_inner_table__fuzz_1 SELECT
CAST('833c9e22-c245-4eb5-8745-117a9a1f26b1', 'UUID') AS id,
CAST(rowNumberInAllBlocks(), 'String') AS key,
*
FROM generateRandom('number Int64, value1 String, value2 String, time Int64', 1, 10, 2)
LIMIT 100;
SET max_parallel_replicas = 3, prefer_localhost_replica = 1, use_hedged_requests = 0, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_parallel_reading_from_replicas = 1;
-- SELECT query will write a Warning to the logs
SET send_logs_level='error';
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table__fuzz_1
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY
key,
value1,
value2
WITH ROLLUP
ORDER BY
key ASC,
value1 ASC,
value2 ASC NULLS LAST
LIMIT 10
FORMAT Null;

View File

@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS parallel_replicas_plain (x String) ENGINE=MergeTree()
INSERT INTO parallel_replicas_plain SELECT toString(number) FROM numbers(10);
SET max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
SET send_logs_level='error';
SET parallel_replicas_for_non_replicated_merge_tree = 0;
SELECT x FROM parallel_replicas_plain LIMIT 1 FORMAT Null;