Merge pull request #59076 from ClickHouse/revert-59059-revert-58838-allow-parallel-replicas-for-join-with-analyzer

Revert "Revert "Allow parallel replicas for JOIN with analyzer [part 1].""
This commit is contained in:
Nikolai Kochetov 2024-01-23 17:01:19 +01:00 committed by GitHub
commit a26f8f5425
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 146 additions and 34 deletions

View File

@ -322,7 +322,6 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,
@ -414,7 +413,6 @@ void executeQueryWithParallelReplicas(
std::move(coordinator),
stream_factory.header,
stream_factory.processed_stage,
main_table,
new_context,
getThrottler(new_context),
std::move(scalars),

View File

@ -70,7 +70,6 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,

View File

@ -1391,7 +1391,7 @@ void Planner::buildPlanForQueryNode()
}
}
if (query_context->canUseTaskBasedParallelReplicas() || !settings.parallel_replicas_custom_key.value.empty())
if (!settings.parallel_replicas_custom_key.value.empty())
{
/// Check support for JOIN for parallel replicas with custom key
if (planner_context->getTableExpressionNodeToData().size() > 1)

View File

@ -357,7 +357,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
@ -369,7 +368,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, query_ast(query_ast_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_))
, main_table(std::move(main_table_))
, context(context_)
, throttler(throttler_)
, scalars(scalars_)

View File

@ -76,7 +76,6 @@ public:
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
@ -99,7 +98,6 @@ private:
ASTPtr query_ast;
ParallelReplicasReadingCoordinatorPtr coordinator;
QueryProcessingStage::Enum stage;
StorageID main_table;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;

View File

@ -213,16 +213,25 @@ void StorageMergeTree::read(
{
if (local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
{
const auto table_id = getStorageID();
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
{
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree);
modified_query_tree = buildQueryTreeForShard(query_info, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
header = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
{
const auto table_id = getStorageID();
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
@ -233,7 +242,6 @@ void StorageMergeTree::read(
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
getStorageID(),
select_stream_factory,
modified_query_ast,
local_context,

View File

@ -5385,7 +5385,9 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree);
modified_query_tree = buildQueryTreeForShard(query_info, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
@ -5408,7 +5410,6 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
getStorageID(),
select_stream_factory,
modified_query_ast,
local_context,

View File

@ -373,11 +373,37 @@ QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeN
removeGroupingFunctionSpecializations(query_tree_to_modify);
// std::cerr << "====================== build 1 \n" << query_tree_to_modify->dumpTree() << std::endl;
createUniqueTableAliases(query_tree_to_modify, nullptr, planner_context->getQueryContext());
// std::cerr << "====================== build 2 \n" << query_tree_to_modify->dumpTree() << std::endl;
return query_tree_to_modify;
}
class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitor<RewriteJoinToGlobalJoinVisitor>
{
public:
using Base = InDepthQueryTreeVisitor<RewriteJoinToGlobalJoinVisitor>;
using Base::Base;
void visitImpl(QueryTreeNodePtr & node)
{
if (auto * join_node = node->as<JoinNode>())
join_node->setLocality(JoinLocality::Global);
}
static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
auto * join_node = parent->as<JoinNode>();
if (join_node && join_node->getRightTableExpression() == child)
return false;
return true;
}
};
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify)
{
RewriteJoinToGlobalJoinVisitor visitor;
visitor.visit(query_tree_to_modify);
}
}

View File

@ -12,4 +12,6 @@ using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeNodePtr query_tree_to_modify);
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify);
}

View File

@ -27,8 +27,9 @@
00917_multiple_joins_denny_crane
02725_agg_projection_resprect_PK
02763_row_policy_storage_merge_alias
02784_parallel_replicas_automatic_decision_join
02818_parameterized_view_with_cte_multiple_usage
# Check after constants refactoring
02901_parallel_replicas_rollup
# Flaky. Please don't delete them without fixing them:
01287_max_execution_speed
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET

View File

@ -20,9 +20,21 @@
23 Sx>b:^UG XpedE)Q: 7433019734386307503
29 2j&S)ba?XG QuQj 17163829389637435056
3 UlI+1 14144472852965836438
0 PJFiUe#J2O _s\' 14427935816175499794
1 >T%O ,z< 17537932797009027240
12 D[6,P #}Lmb[ ZzU 6394957109822140795
18 $_N- 24422838680427462
2 bX?}ix [ Ny]2 G 16242612901291874718
20 VE] Y 15120036904703536841
22 Ti~3)N)< A!( 3 18361093572663329113
23 Sx>b:^UG XpedE)Q: 7433019734386307503
29 2j&S)ba?XG QuQj 17163829389637435056
3 UlI+1 14144472852965836438
=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============
0 3 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2`, toUInt64(min(`__table1`.`time`)) AS `start_ts` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > 1610517366120) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2` ORDER BY `__table1`.`key` ASC, `__table1`.`value1` ASC, `__table1`.`value2` ASC LIMIT _CAST(10, \'UInt64\') SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer = 1
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2` ORDER BY `key` ASC, `value1` ASC, `value2` ASC LIMIT 10
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=1;
=============== OUTER QUERY (NO PARALLEL) ===============
>T%O ,z< 10
NQTpY# W\\Xx4 10
@ -39,6 +51,16 @@ U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
>T%O ,z< 10
NQTpY# W\\Xx4 10
PJFiUe#J2O _s\' 10
U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
0 3 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > 1610517366120) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2`
0 3 SELECT `__table2`.`value1` AS `value1`, `__table2`.`value2` AS `value2`, count() AS `count` FROM `default`.`join_outer_table` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` USING (`key`) GROUP BY `__table1`.`key`, `__table2`.`value1`, `__table2`.`value2`
0 3 SELECT `key`, `value1`, `value2` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_7105554115296635472_12427301373021079614` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=1;

View File

@ -21,7 +21,6 @@ SELECT
* FROM generateRandom('number Int64, value1 String, value2 String, time Int64', 1, 10, 2)
LIMIT 100;
SET allow_experimental_analyzer = 0;
SET max_parallel_replicas = 3;
SET prefer_localhost_replica = 1;
SET cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
@ -39,6 +38,18 @@ FROM join_inner_table
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10;
-- settings allow_experimental_analyzer=0;
-- SELECT
-- key,
-- value1,
-- value2,
-- toUInt64(min(time)) AS start_ts
-- FROM join_inner_table
-- PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
-- GROUP BY key, value1, value2
-- ORDER BY key, value1, value2
-- LIMIT 10 settings allow_experimental_analyzer=1;
SELECT '=============== INNER QUERY (PARALLEL) ===============';
@ -53,18 +64,31 @@ PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas = 1;
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;
-- Parallel inner query alone
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
ORDER BY key, value1, value2
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=1;
SELECT '=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============';
SYSTEM FLUSH LOGS;
-- There should be 4 queries. The main query as received by the initiator and the 3 equal queries sent to each replica
SELECT is_initial_query, count() as c, query,
SELECT is_initial_query, count() as c, replaceRegexpAll(query, '_data_(\d+)_(\d+)', '_data_') as query
FROM system.query_log
WHERE
event_date >= yesterday()
AND type = 'QueryFinish'
AND initial_query_id =
AND initial_query_id IN
(
SELECT query_id
FROM system.query_log
@ -160,18 +184,48 @@ FROM
)
GROUP BY value1, value2
ORDER BY value1, value2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1;
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;
-- Parallel full query
SELECT
value1,
value2,
avg(count) AS avg
FROM
(
SELECT
key,
value1,
value2,
count() AS count
FROM join_outer_table
INNER JOIN
(
SELECT
key,
value1,
value2,
toUInt64(min(time)) AS start_ts
FROM join_inner_table
PREWHERE (id = '833c9e22-c245-4eb5-8745-117a9a1f26b1') AND (number > toUInt64('1610517366120'))
GROUP BY key, value1, value2
) USING (key)
GROUP BY key, value1, value2
)
GROUP BY value1, value2
ORDER BY value1, value2
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=1;
SYSTEM FLUSH LOGS;
-- There should be 7 queries. The main query as received by the initiator, the 3 equal queries to execute the subquery
-- in the inner join and the 3 queries executing the whole query (but replacing the subquery with a temp table)
SELECT is_initial_query, count() as c, query,
SELECT is_initial_query, count() as c, replaceRegexpAll(query, '_data_(\d+)_(\d+)', '_data_') as query
FROM system.query_log
WHERE
event_date >= yesterday()
AND type = 'QueryFinish'
AND initial_query_id =
AND initial_query_id IN
(
SELECT query_id
FROM system.query_log

View File

@ -1,6 +1,4 @@
02784_automatic_parallel_replicas_join-default_simple_join_10M_pure 0 estimated parallel replicas
02784_automatic_parallel_replicas_join-default_simple_join_10M_pure 1 estimated parallel replicas
02784_automatic_parallel_replicas_join-default_simple_join_5M_pure 0 estimated parallel replicas
02784_automatic_parallel_replicas_join-default_simple_join_5M_pure 2 estimated parallel replicas
02784_automatic_parallel_replicas_join-default_simple_join_1M_pure 1 estimated parallel replicas
02784_automatic_parallel_replicas_join-default_simple_join_1M_pure 10 estimated parallel replicas

View File

@ -68,7 +68,7 @@ function run_query_with_pure_parallel_replicas () {
--allow_experimental_parallel_reading_from_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \
--parallel_replicas_min_number_of_rows_per_replica "$2" \
|& grep "It is enough work for" | awk '{ print substr($7, 2, length($7) - 2) "\t" $20 " estimated parallel replicas" }'
|& grep "It is enough work for" | awk '{ print substr($7, 2, length($7) - 2) "\t" $20 " estimated parallel replicas" }' | sort -n -k2 -b | grep -Pv "\t0 estimated parallel replicas"
}
query_id_base="02784_automatic_parallel_replicas_join-$CLICKHOUSE_DATABASE"

View File

@ -1,5 +1,7 @@
1
02901_parallel_replicas_rollup-default Used parallel replicas: true
Distributed query with analyzer
1
0 0 0 6
2019 0 0 2
2019 1 0 2

View File

@ -39,6 +39,11 @@ $CLICKHOUSE_CLIENT \
ORDER BY max((SELECT 1 WHERE 0));
";
were_parallel_replicas_used $query_id
# It was a bug in analyzer distributed header.
echo "Distributed query with analyzer"
$CLICKHOUSE_CLIENT --query "SELECT 1 FROM remote('127.0.0.{2,3}', currentDatabase(), nested) GROUP BY 1 WITH ROLLUP ORDER BY max((SELECT 1 WHERE 0))"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS nested"