From 3dfa9ab8e96c1beb7b258ac37f801c2c57231407 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 15 Nov 2024 15:09:32 +0000 Subject: [PATCH] Backport #71162 to 24.10: Fix right JOINS with parallel replicas --- src/Interpreters/IJoin.h | 1 - src/Planner/Planner.cpp | 2 +- src/Planner/PlannerJoinTree.cpp | 73 +++-- src/Planner/findParallelReplicasQuery.cpp | 95 ++++-- src/Planner/findQueryForParallelReplicas.h | 4 +- .../QueryPlan/ParallelReplicasLocalPlan.cpp | 12 +- src/Storages/SelectQueryInfo.h | 2 - src/Storages/StorageMergeTree.cpp | 4 +- src/Storages/StorageReplicatedMergeTree.cpp | 5 +- src/Storages/buildQueryTreeForShard.cpp | 61 ++-- .../02771_parallel_replicas_analyzer.sql | 4 +- ...llel_replicas_joins_and_analyzer.reference | 207 +++++-------- ...arallel_replicas_joins_and_analyzer.sql.j2 | 69 ++--- .../03080_incorrect_join_with_merge.sql | 5 + .../03173_parallel_replicas_join_bug.sh | 3 + .../03254_pr_join_on_dups.reference | 273 ++++++++++++++++++ .../0_stateless/03254_pr_join_on_dups.sql | 73 +++++ .../03261_pr_semi_anti_join.reference | 16 + .../0_stateless/03261_pr_semi_anti_join.sql | 26 ++ 19 files changed, 683 insertions(+), 252 deletions(-) create mode 100644 tests/queries/0_stateless/03254_pr_join_on_dups.reference create mode 100644 tests/queries/0_stateless/03254_pr_join_on_dups.sql create mode 100644 tests/queries/0_stateless/03261_pr_semi_anti_join.reference create mode 100644 tests/queries/0_stateless/03261_pr_semi_anti_join.sql diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index 8f648de2538..5a83137ca2a 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 8d3c75fdabb..17277dfe8cd 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -274,7 +274,7 @@ FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & return res; } -FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options) +FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options) { if (select_query_options.only_analyze) return {}; diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 5c153f6db39..0ff0c0d0901 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -659,6 +659,7 @@ std::unique_ptr createComputeAliasColumnsStep( } JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression, + const QueryTreeNodePtr & parent_join_tree, const SelectQueryInfo & select_query_info, const SelectQueryOptions & select_query_options, PlannerContextPtr & planner_context, @@ -696,8 +697,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres table_expression_query_info.table_expression = table_expression; if (const auto & filter_actions = table_expression_data.getFilterActions()) table_expression_query_info.filter_actions_dag = std::make_shared(filter_actions->clone()); - table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas - = table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table; size_t max_streams = settings[Setting::max_threads]; size_t max_threads_execute_query = settings[Setting::max_threads]; @@ -912,21 +911,35 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// It is just a safety check needed until we have a proper sending plan to replicas. /// If we have a non-trivial storage like View it might create its own Planner inside read(), run findTableForParallelReplicas() /// and find some other table that might be used for reading with parallel replicas. It will lead to errors. - const bool other_table_already_chosen_for_reading_with_parallel_replicas - = planner_context->getGlobalPlannerContext()->parallel_replicas_table - && !table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas; - if (other_table_already_chosen_for_reading_with_parallel_replicas) - planner_context->getMutableQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); - - storage->read( - query_plan, - columns_names, - storage_snapshot, - table_expression_query_info, - query_context, - from_stage, - max_block_size, - max_streams); + const bool no_tables_or_another_table_chosen_for_reading_with_parallel_replicas_mode + = query_context->canUseParallelReplicasOnFollower() + && table_node != planner_context->getGlobalPlannerContext()->parallel_replicas_table; + if (no_tables_or_another_table_chosen_for_reading_with_parallel_replicas_mode) + { + auto mutable_context = Context::createCopy(query_context); + mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); + storage->read( + query_plan, + columns_names, + storage_snapshot, + table_expression_query_info, + std::move(mutable_context), + from_stage, + max_block_size, + max_streams); + } + else + { + storage->read( + query_plan, + columns_names, + storage_snapshot, + table_expression_query_info, + query_context, + from_stage, + max_block_size, + max_streams); + } auto parallel_replicas_enabled_for_storage = [](const StoragePtr & table, const Settings & query_settings) { @@ -942,6 +955,19 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// query_plan can be empty if there is nothing to read if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings)) { + const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node) + { + const JoinNode * join_node = join_tree_node->as(); + if (!join_node) + return true; + + const auto join_kind = join_node->getKind(); + if (join_kind == JoinKind::Left || join_kind == JoinKind::Right || join_kind == JoinKind::Inner) + return true; + + return false; + }(parent_join_tree); + if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0) { if (auto cluster = query_context->getClusterForParallelReplicas(); @@ -964,7 +990,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres query_plan = std::move(query_plan_parallel_replicas); } } - else if (ClusterProxy::canUseParallelReplicasOnInitiator(query_context)) + else if (ClusterProxy::canUseParallelReplicasOnInitiator(query_context) && allow_parallel_replicas_for_table_expression) { // (1) find read step QueryPlan::Node * node = query_plan.getRootNode(); @@ -1794,7 +1820,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, const ColumnIdentifierSet & outer_scope_columns, PlannerContextPtr & planner_context) { - auto table_expressions_stack = buildTableExpressionsStack(query_node->as().getJoinTree()); + const QueryTreeNodePtr & join_tree_node = query_node->as().getJoinTree(); + auto table_expressions_stack = buildTableExpressionsStack(join_tree_node); size_t table_expressions_stack_size = table_expressions_stack.size(); bool is_single_table_expression = table_expressions_stack_size == 1; @@ -1829,7 +1856,9 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, * Examples: Distributed, LiveView, Merge storages. */ auto left_table_expression = table_expressions_stack.front(); - auto left_table_expression_query_plan = buildQueryPlanForTableExpression(left_table_expression, + auto left_table_expression_query_plan = buildQueryPlanForTableExpression( + left_table_expression, + join_tree_node, select_query_info, select_query_options, planner_context, @@ -1902,7 +1931,9 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, * table expression in subquery. */ bool is_remote = planner_context->getTableExpressionDataOrThrow(table_expression).isRemote(); - query_plans_stack.push_back(buildQueryPlanForTableExpression(table_expression, + query_plans_stack.push_back(buildQueryPlanForTableExpression( + table_expression, + join_tree_node, select_query_info, select_query_options, planner_context, diff --git a/src/Planner/findParallelReplicasQuery.cpp b/src/Planner/findParallelReplicasQuery.cpp index fce86a6cda0..75655428d8c 100644 --- a/src/Planner/findParallelReplicasQuery.cpp +++ b/src/Planner/findParallelReplicasQuery.cpp @@ -23,6 +23,8 @@ #include #include +#include + namespace DB { namespace Setting @@ -38,12 +40,12 @@ namespace ErrorCodes /// Returns a list of (sub)queries (candidates) which may support parallel replicas. /// The rule is : -/// subquery has only LEFT or ALL INNER JOIN (or none), and left part is MergeTree table or subquery candidate as well. +/// subquery has only LEFT / RIGHT / ALL INNER JOIN (or none), and left / right part is MergeTree table or subquery candidate as well. /// /// Additional checks are required, so we return many candidates. The innermost subquery is on top. -std::stack getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node) +std::vector getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node) { - std::stack res; + std::vector res; while (query_tree_node) { @@ -75,7 +77,7 @@ std::stack getSupportingParallelReplicasQuery(const IQueryTre { const auto & query_node_to_process = query_tree_node->as(); query_tree_node = query_node_to_process.getJoinTree().get(); - res.push(&query_node_to_process); + res.push_back(&query_node_to_process); break; } case QueryTreeNodeType::UNION: @@ -98,17 +100,16 @@ std::stack getSupportingParallelReplicasQuery(const IQueryTre case QueryTreeNodeType::JOIN: { const auto & join_node = query_tree_node->as(); - auto join_kind = join_node.getKind(); - auto join_strictness = join_node.getStrictness(); + const auto join_kind = join_node.getKind(); + const auto join_strictness = join_node.getStrictness(); - bool can_parallelize_join = - join_kind == JoinKind::Left - || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All); - - if (!can_parallelize_join) + if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All)) + query_tree_node = join_node.getLeftTableExpression().get(); + else if (join_kind == JoinKind::Right) + query_tree_node = join_node.getRightTableExpression().get(); + else return {}; - query_tree_node = join_node.getLeftTableExpression().get(); break; } default: @@ -163,14 +164,27 @@ QueryTreeNodePtr replaceTablesWithDummyTables(QueryTreeNodePtr query, const Cont return query->cloneAndReplace(visitor.replacement_map); } +#ifdef DUMP_PARALLEL_REPLICAS_QUERY_CANDIDATES +static void dumpStack(const std::vector & stack) +{ + std::ranges::reverse_view rv{stack}; + for (const auto * node : rv) + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "{}\n{}", CityHash_v1_0_2::Hash128to64(node->getTreeHash()), node->dumpTree()); +} +#endif + /// Find the best candidate for parallel replicas execution by verifying query plan. -/// If query plan has only Expression, Filter of Join steps, we can execute it fully remotely and check the next query. +/// If query plan has only Expression, Filter or Join steps, we can execute it fully remotely and check the next query. /// Otherwise we can execute current query up to WithMergableStage only. const QueryNode * findQueryForParallelReplicas( - std::stack stack, + std::vector stack, const std::unordered_map & mapping, const Settings & settings) { +#ifdef DUMP_PARALLEL_REPLICAS_QUERY_CANDIDATES + dumpStack(stack); +#endif + struct Frame { const QueryPlan::Node * node = nullptr; @@ -189,8 +203,8 @@ const QueryNode * findQueryForParallelReplicas( while (!stack.empty()) { - const QueryNode * const subquery_node = stack.top(); - stack.pop(); + const QueryNode * const subquery_node = stack.back(); + stack.pop_back(); auto it = mapping.find(subquery_node); /// This should not happen ideally. @@ -236,7 +250,7 @@ const QueryNode * findQueryForParallelReplicas( else { const auto * join = typeid_cast(step); - /// We've checked that JOIN is INNER/LEFT in query tree. + /// We've checked that JOIN is INNER/LEFT/RIGHT on query tree level before. /// Don't distribute UNION node. if (!join) return res; @@ -263,7 +277,7 @@ const QueryNode * findQueryForParallelReplicas( return res; } -const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options) +const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options) { if (select_query_options.only_analyze) return nullptr; @@ -287,7 +301,7 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr return nullptr; /// We don't have any subquery and storage can process parallel replicas by itself. - if (stack.top() == query_tree_node.get()) + if (stack.back() == query_tree_node.get()) return nullptr; /// This is needed to avoid infinite recursion. @@ -310,31 +324,33 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr const auto & mapping = planner.getQueryNodeToPlanStepMapping(); const auto * res = findQueryForParallelReplicas(new_stack, mapping, context->getSettingsRef()); - /// Now, return a query from initial stack. if (res) { + // find query in initial stack while (!new_stack.empty()) { - if (res == new_stack.top()) - return stack.top(); + if (res == new_stack.back()) + { + res = stack.back(); + break; + } - stack.pop(); - new_stack.pop(); + stack.pop_back(); + new_stack.pop_back(); } } - return res; } static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node) { - std::stack right_join_nodes; - while (query_tree_node || !right_join_nodes.empty()) + std::stack join_nodes; + while (query_tree_node || !join_nodes.empty()) { if (!query_tree_node) { - query_tree_node = right_join_nodes.top(); - right_join_nodes.pop(); + query_tree_node = join_nodes.top(); + join_nodes.pop(); } auto join_tree_node_type = query_tree_node->getNodeType(); @@ -383,8 +399,23 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que case QueryTreeNodeType::JOIN: { const auto & join_node = query_tree_node->as(); - query_tree_node = join_node.getLeftTableExpression().get(); - right_join_nodes.push(join_node.getRightTableExpression().get()); + const auto join_kind = join_node.getKind(); + const auto join_strictness = join_node.getStrictness(); + + if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner and join_strictness == JoinStrictness::All)) + { + query_tree_node = join_node.getLeftTableExpression().get(); + join_nodes.push(join_node.getRightTableExpression().get()); + } + else if (join_kind == JoinKind::Right) + { + query_tree_node = join_node.getRightTableExpression().get(); + join_nodes.push(join_node.getLeftTableExpression().get()); + } + else + { + return nullptr; + } break; } default: @@ -400,7 +431,7 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que return nullptr; } -const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options) +const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options) { if (select_query_options.only_analyze) return nullptr; diff --git a/src/Planner/findQueryForParallelReplicas.h b/src/Planner/findQueryForParallelReplicas.h index cdce4ad0b47..83aa11c8c64 100644 --- a/src/Planner/findQueryForParallelReplicas.h +++ b/src/Planner/findQueryForParallelReplicas.h @@ -15,10 +15,10 @@ struct SelectQueryOptions; /// Find a query which can be executed with parallel replicas up to WithMergableStage. /// Returned query will always contain some (>1) subqueries, possibly with joins. -const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options); +const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options); /// Find a table from which we should read on follower replica. It's the left-most table within all JOINs and UNIONs. -const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options); +const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options); struct JoinTreeQueryPlan; diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 050044edd3a..6704095ca82 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -3,12 +3,15 @@ #include #include #include +#include #include #include +#include #include #include #include #include +#include #include #include #include @@ -62,7 +65,14 @@ std::pair, bool> createLocalPlanForParallelReplicas( break; if (!node->children.empty()) - node = node->children.at(0); + { + // in case of RIGHT JOIN, - reading from right table is parallelized among replicas + const JoinStep * join = typeid_cast(node->step.get()); + if (join && join->getJoin()->getTableJoin().kind() == JoinKind::Right) + node = node->children.at(1); + else + node = node->children.at(0); + } else node = nullptr; } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 7ad6a733c6f..f67274f227a 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -162,8 +162,6 @@ struct SelectQueryInfo /// It's guaranteed to be present in JOIN TREE of `query_tree` QueryTreeNodePtr table_expression; - bool current_table_chosen_for_reading_with_parallel_replicas = false; - /// Table expression modifiers for storage std::optional table_expression_modifiers; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index abc66df0d8b..4bb2de642a2 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -274,9 +274,7 @@ void StorageMergeTree::read( } const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() - && local_context->getSettingsRef()[Setting::parallel_replicas_for_non_replicated_merge_tree] - && (!local_context->getSettingsRef()[Setting::allow_experimental_analyzer] - || query_info.current_table_chosen_for_reading_with_parallel_replicas); + && local_context->getSettingsRef()[Setting::parallel_replicas_for_non_replicated_merge_tree]; if (auto plan = reader.read( column_names, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 850623157a1..5f02b92198b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5631,10 +5631,7 @@ void StorageReplicatedMergeTree::readLocalImpl( const size_t max_block_size, const size_t num_streams) { - const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() - && (!local_context->getSettingsRef()[Setting::allow_experimental_analyzer] - || query_info.current_table_chosen_for_reading_with_parallel_replicas); - + const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower(); auto plan = reader.read( column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index bbf32c68d19..bce30260954 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -314,6 +314,35 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node, return temporary_table_expression_node; } +QueryTreeNodePtr getSubqueryFromTableExpression( + const QueryTreeNodePtr & join_table_expression, + const std::unordered_map & column_source_to_columns, + const ContextPtr & context) +{ + auto join_table_expression_node_type = join_table_expression->getNodeType(); + QueryTreeNodePtr subquery_node; + + if (join_table_expression_node_type == QueryTreeNodeType::QUERY || join_table_expression_node_type == QueryTreeNodeType::UNION) + { + subquery_node = join_table_expression; + } + else if ( + join_table_expression_node_type == QueryTreeNodeType::TABLE || join_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION) + { + const auto & columns = column_source_to_columns.at(join_table_expression).columns; + subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns, join_table_expression, context); + } + else + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected JOIN table expression to be table, table function, query or union node. Actual {}", + join_table_expression->formatASTForErrorMessage()); + } + + return subquery_node; +} + } QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify) @@ -335,37 +364,31 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex { if (auto * join_node = global_in_or_join_node.query_node->as()) { - auto join_right_table_expression = join_node->getRightTableExpression(); - auto join_right_table_expression_node_type = join_right_table_expression->getNodeType(); - - QueryTreeNodePtr subquery_node; - - if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY || - join_right_table_expression_node_type == QueryTreeNodeType::UNION) + QueryTreeNodePtr join_table_expression; + const auto join_kind = join_node->getKind(); + if (join_kind == JoinKind::Left || join_kind == JoinKind::Inner) { - subquery_node = join_right_table_expression; + join_table_expression = join_node->getRightTableExpression(); } - else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE || - join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION) + else if (join_kind == JoinKind::Right) { - const auto & columns = column_source_to_columns.at(join_right_table_expression).columns; - subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns, - join_right_table_expression, - planner_context->getQueryContext()); + join_table_expression = join_node->getLeftTableExpression(); } else { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Expected JOIN right table expression to be table, table function, query or union node. Actual {}", - join_right_table_expression->formatASTForErrorMessage()); + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Unexpected join kind: {}", join_kind); } + auto subquery_node + = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); + auto temporary_table_expression_node = executeSubqueryNode(subquery_node, planner_context->getMutableQueryContext(), global_in_or_join_node.subquery_depth); - temporary_table_expression_node->setAlias(join_right_table_expression->getAlias()); + temporary_table_expression_node->setAlias(join_table_expression->getAlias()); - replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node)); + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); continue; } if (auto * in_function_node = global_in_or_join_node.query_node->as()) diff --git a/tests/queries/0_stateless/02771_parallel_replicas_analyzer.sql b/tests/queries/0_stateless/02771_parallel_replicas_analyzer.sql index 081077ba460..a2d26a8fc78 100644 --- a/tests/queries/0_stateless/02771_parallel_replicas_analyzer.sql +++ b/tests/queries/0_stateless/02771_parallel_replicas_analyzer.sql @@ -1,5 +1,5 @@ -- Tags: zookeeper -DROP TABLE IF EXISTS join_inner_table__fuzz_146_replicated; +DROP TABLE IF EXISTS join_inner_table__fuzz_146_replicated SYNC; CREATE TABLE join_inner_table__fuzz_146_replicated ( `id` UUID, @@ -52,4 +52,4 @@ WHERE GROUP BY is_initial_query, query ORDER BY is_initial_query DESC, c, query; -DROP TABLE join_inner_table__fuzz_146_replicated; +DROP TABLE join_inner_table__fuzz_146_replicated SYNC; diff --git a/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.reference b/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.reference index 93003b6cf6d..765847c0607 100644 --- a/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.reference +++ b/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.reference @@ -2,7 +2,7 @@ set parallel_replicas_prefer_local_join = 0; -- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode. -select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -18,7 +18,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x 13 13 0 0 0 0 14 14 14 14 0 0 15 15 0 0 0 0 -explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z; Union Expression Join @@ -40,8 +40,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -63,8 +62,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Union @@ -90,8 +88,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; 54 54 50 50 12 12 0 64 64 0 0 0 0 1 explain description=0 @@ -100,8 +97,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; Expression Sorting Expression @@ -129,8 +125,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -152,8 +147,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Expression @@ -181,8 +175,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -204,8 +197,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Expression @@ -237,8 +229,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5; 0 0 0 0 0 0 6 6 6 6 0 0 8 8 8 8 0 0 @@ -260,30 +251,21 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; -Expression - Join - Expression - Join - Union +select * from sub5; +Union + Expression + Join + Expression + Join + Expression + ReadFromMemoryStorage Expression Expression ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Union - Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Union Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas + ReadFromMemoryStorage + Expression + ReadFromRemoteParallelReplicas -- -- RIGHT JOIN in sub5: sub5 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -291,7 +273,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 @@ -313,31 +295,26 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting - Expression - Join - Union + Union + Expression + Sorting Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Expression - Join - Union + Join Expression - Expression - ReadFromMergeTree + ReadFromMemoryStorage Expression - ReadFromRemoteParallelReplicas - Union - Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas + Join + Expression + Expression + ReadFromMergeTree + Expression + Expression + ReadFromMergeTree + Expression + ReadFromRemoteParallelReplicas -- -- Subqueries for IN allowed with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)), @@ -345,8 +322,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -368,8 +344,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Union @@ -402,7 +377,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0; +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -425,7 +400,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;-- { echoOn } +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;-- { echoOn } Expression Sorting Expression @@ -455,7 +430,7 @@ Expression ReadFromRemoteParallelReplicas set parallel_replicas_prefer_local_join = 1; -- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode. -select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -471,7 +446,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x 13 13 0 0 0 0 14 14 14 14 0 0 15 15 0 0 0 0 -explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z; Union Expression Join @@ -495,8 +470,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -518,8 +492,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Union @@ -547,8 +520,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; 54 54 50 50 12 12 0 64 64 0 0 0 0 1 explain description=0 @@ -557,8 +529,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; Expression Sorting Expression @@ -588,8 +559,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -611,8 +581,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Expression @@ -641,8 +610,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -664,8 +632,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Expression @@ -697,8 +664,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5; 0 0 0 0 0 0 6 6 6 6 0 0 8 8 8 8 0 0 @@ -720,30 +686,23 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; -Expression - Join - Expression - Join - Union +select * from sub5; +Union + Expression + Join + Expression + Join Expression Expression ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Union Expression Expression ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Union Expression Expression ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas + Expression + ReadFromRemoteParallelReplicas -- -- RIGHT JOIN in sub5: sub5 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -751,7 +710,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 @@ -773,31 +732,27 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting - Expression - Join - Union + Union + Expression + Sorting Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas - Expression - Join - Union + Join Expression Expression ReadFromMergeTree Expression - ReadFromRemoteParallelReplicas - Union - Expression - Expression - ReadFromMergeTree - Expression - ReadFromRemoteParallelReplicas + Join + Expression + Expression + ReadFromMergeTree + Expression + Expression + ReadFromMergeTree + Expression + ReadFromRemoteParallelReplicas -- -- Subqueries for IN allowed with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)), @@ -805,8 +760,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -828,8 +782,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; Expression Sorting Union @@ -864,7 +817,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0; +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0; 0 0 0 0 0 0 1 1 0 0 0 0 3 3 0 0 0 0 @@ -887,7 +840,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0; +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0; Expression Sorting Expression diff --git a/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.sql.j2 b/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.sql.j2 index 23291881eb4..31cb0b735ae 100644 --- a/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.sql.j2 +++ b/tests/queries/0_stateless/02967_parallel_replicas_joins_and_analyzer.sql.j2 @@ -1,16 +1,17 @@ -drop table if exists tab1; -drop table if exists tab2; -drop table if exists tab3; +drop table if exists tab1 sync; +drop table if exists tab2 sync; +drop table if exists tab3 sync; -create table tab1 (x UInt32, y UInt32, shard UInt32) engine = MergeTree order by shard; -create table tab2 (y UInt32, z UInt32) engine = MergeTree order by tuple(); -create table tab3 (z UInt32, a UInt32) engine = MergeTree order by tuple(); +create table tab1 (x UInt32, y UInt32, shard UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab1', 'r1') order by shard; +create table tab2 (y UInt32, z UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab2', 'r1') order by tuple(); +create table tab3 (z UInt32, a UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab3', 'r1') order by tuple(); insert into tab1 select number, number, number from numbers(16); insert into tab2 select number * 2, number * 2 from numbers(8); insert into tab3 select number * 4, number * 4 from numbers(4); -set parallel_replicas_local_plan=1; +set enable_analyzer = 1; +set enable_parallel_replicas = 2, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_local_plan = 1; {% for use_global_in in [0, 1] -%} @@ -19,8 +20,9 @@ set parallel_replicas_local_plan=1; set parallel_replicas_prefer_local_join = {{use_global_in}}; -- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode. -select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; -explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x; +explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z; + -- -- The same query with cte; with sub1 as (select x, y from tab1 where x != 2), @@ -28,8 +30,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; explain description=0 with sub1 as (select x, y from tab1 where x != 2), @@ -37,8 +38,8 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; + -- -- GROUP BY should work up to WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -46,8 +47,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; explain description=0 with sub1 as (select x, y from tab1 where x != 2), @@ -55,8 +55,8 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key) -select * from sub5 order by key -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by key; + -- -- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -64,8 +64,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; explain description=0 with sub1 as (select x, y from tab1 where x != 2), @@ -73,8 +72,8 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; + -- -- ORDER BY in sub1 : sub1 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2 order by y), @@ -82,8 +81,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; explain description=0 with sub1 as (select x, y from tab1 where x != 2 order by y), @@ -91,8 +89,8 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; + -- -- RIGHT JOIN in sub3: sub3 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -100,8 +98,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5; explain description=0 with sub1 as (select x, y from tab1 where x != 2), @@ -109,8 +106,8 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5; + -- -- RIGHT JOIN in sub5: sub5 -> WithMergableStage with sub1 as (select x, y from tab1 where x != 2), @@ -118,7 +115,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; explain description=0 with sub1 as (select x, y from tab1 where x != 2), @@ -126,7 +123,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z) -select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; -- -- Subqueries for IN allowed @@ -135,8 +132,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; explain description=0 with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)), @@ -144,8 +140,7 @@ sub2 as (select y, z from tab2 where y != 4), sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y), sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) -select * from sub5 order by x -SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1; +select * from sub5 order by x; -- -- Subqueries for IN are not allowed @@ -155,7 +150,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0; +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0; explain description=0 with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)), @@ -164,6 +159,6 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y sub4 as (select z, a from tab3 where z != 8), sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z) select * from sub5 order by x -SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0; +SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0; {%- endfor %} diff --git a/tests/queries/0_stateless/03080_incorrect_join_with_merge.sql b/tests/queries/0_stateless/03080_incorrect_join_with_merge.sql index a34c71a44e2..a743c5bdffb 100644 --- a/tests/queries/0_stateless/03080_incorrect_join_with_merge.sql +++ b/tests/queries/0_stateless/03080_incorrect_join_with_merge.sql @@ -2,6 +2,7 @@ SET enable_analyzer=1; SET distributed_foreground_insert=1; +DROP TABLE IF EXISTS first_table_lr SYNC; CREATE TABLE first_table_lr ( id String, @@ -11,6 +12,7 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test_03080/alter', ' ORDER BY id; +DROP TABLE IF EXISTS first_table; CREATE TABLE first_table ( id String, @@ -19,6 +21,7 @@ CREATE TABLE first_table ENGINE = Distributed('test_shard_localhost', currentDatabase(), 'first_table_lr'); +DROP TABLE IF EXISTS second_table_lr; CREATE TABLE second_table_lr ( id String, @@ -26,6 +29,7 @@ CREATE TABLE second_table_lr ) ENGINE = MergeTree() ORDER BY id; +DROP TABLE IF EXISTS second_table; CREATE TABLE second_table ( id String, @@ -36,6 +40,7 @@ ENGINE = Distributed('test_shard_localhost', currentDatabase(), 'second_table_lr INSERT INTO first_table VALUES ('1', '2'), ('3', '4'); INSERT INTO second_table VALUES ('1', '2'), ('3', '4'); +DROP TABLE IF EXISTS two_tables; CREATE TABLE two_tables ( id String, diff --git a/tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh b/tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh index 289a49c72f4..1ee3d729cb4 100755 --- a/tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh +++ b/tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh @@ -6,12 +6,15 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) $CLICKHOUSE_CLIENT -q " + DROP TABLE IF EXISTS ids; CREATE TABLE ids (id UUID, whatever String) Engine=MergeTree ORDER BY tuple(); INSERT INTO ids VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', 'whatever'); + DROP TABLE IF EXISTS data; CREATE TABLE data (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple(); INSERT INTO data VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-01', 'CREATED'); + DROP TABLE IF EXISTS data2; CREATE TABLE data2 (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple(); INSERT INTO data2 VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-02', 'CREATED'); " diff --git a/tests/queries/0_stateless/03254_pr_join_on_dups.reference b/tests/queries/0_stateless/03254_pr_join_on_dups.reference new file mode 100644 index 00000000000..58602bafb5d --- /dev/null +++ b/tests/queries/0_stateless/03254_pr_join_on_dups.reference @@ -0,0 +1,273 @@ +inner +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +9 l9 \N 9 r9 nr9 +inner subs +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +9 l9 \N 9 r9 nr9 +inner expr +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +9 l9 \N 9 r9 nr9 +left +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 9 r9 nr9 +left subs +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 9 r9 nr9 +left expr +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 9 r9 nr9 +right +0 \N 6 r7 nr7 +0 \N 7 r8 nr8 +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +9 l9 \N 9 r9 nr9 +right subs +0 \N 6 r7 nr7 +0 \N 7 r8 nr8 +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +9 l9 \N 9 r9 nr9 +full +0 \N 6 r7 nr7 +0 \N 7 r8 nr8 +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 9 r9 nr9 +full subs +0 \N 6 r7 nr7 +0 \N 7 r8 nr8 +1 l1 1 1 r1 \N +1 l1 1 1 r2 \N +2 l2 2 2 r3 \N +2 l3 3 2 r3 \N +3 l4 4 3 r4 \N +3 l4 4 3 r5 \N +4 l5 \N 4 r6 nr6 +4 l6 \N 4 r6 nr6 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 9 r9 nr9 +self inner +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l2 2 2 l3 3 +2 l3 3 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 4 l5 \N +4 l5 \N 4 l6 \N +4 l6 \N 4 l5 \N +4 l6 \N 4 l6 \N +5 l7 \N 5 l7 \N +8 l8 \N 8 l8 \N +9 l9 \N 9 l9 \N +self inner nullable +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +self inner nullable vs not nullable +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l2 2 +3 l4 4 2 l3 3 +4 l5 \N 3 l4 4 +4 l6 \N 3 l4 4 +self inner nullable vs not nullable 2 +4 r6 nr6 4 r6 nr6 +6 r7 nr7 6 r7 nr7 +7 r8 nr8 7 r8 nr8 +9 r9 nr9 9 r9 nr9 +self left +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l2 2 2 l3 3 +2 l3 3 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 4 l5 \N +4 l5 \N 4 l6 \N +4 l6 \N 4 l5 \N +4 l6 \N 4 l6 \N +5 l7 \N 5 l7 \N +8 l8 \N 8 l8 \N +9 l9 \N 9 l9 \N +self left nullable +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 0 \N +4 l6 \N 0 \N +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 0 \N +self left nullable vs not nullable +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l2 2 +3 l4 4 2 l3 3 +4 l5 \N 3 l4 4 +4 l6 \N 3 l4 4 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 0 \N +self left nullable vs not nullable 2 +1 r1 \N 0 \N +1 r2 \N 0 \N +2 r3 \N 0 \N +3 r4 \N 0 \N +3 r5 \N 0 \N +4 r6 nr6 4 r6 nr6 +6 r7 nr7 6 r7 nr7 +7 r8 nr8 7 r8 nr8 +9 r9 nr9 9 r9 nr9 +self right +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l2 2 2 l3 3 +2 l3 3 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 4 l5 \N +4 l5 \N 4 l6 \N +4 l6 \N 4 l5 \N +4 l6 \N 4 l6 \N +5 l7 \N 5 l7 \N +8 l8 \N 8 l8 \N +9 l9 \N 9 l9 \N +self right nullable +0 \N 4 l5 \N +0 \N 4 l6 \N +0 \N 5 l7 \N +0 \N 8 l8 \N +0 \N 9 l9 \N +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +self right nullable vs not nullable +0 \N 4 l5 \N +0 \N 4 l6 \N +0 \N 5 l7 \N +0 \N 8 l8 \N +0 \N 9 l9 \N +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l2 2 +3 l4 4 2 l3 3 +4 l5 \N 3 l4 4 +4 l6 \N 3 l4 4 +self full +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l2 2 2 l3 3 +2 l3 3 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 4 l5 \N +4 l5 \N 4 l6 \N +4 l6 \N 4 l5 \N +4 l6 \N 4 l6 \N +5 l7 \N 5 l7 \N +8 l8 \N 8 l8 \N +9 l9 \N 9 l9 \N +self full nullable +0 \N 4 l5 \N +0 \N 4 l6 \N +0 \N 5 l7 \N +0 \N 8 l8 \N +0 \N 9 l9 \N +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l3 3 +3 l4 4 3 l4 4 +4 l5 \N 0 \N +4 l6 \N 0 \N +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 0 \N +self full nullable vs not nullable +0 \N 4 l5 \N +0 \N 4 l6 \N +0 \N 5 l7 \N +0 \N 8 l8 \N +0 \N 9 l9 \N +1 l1 1 1 l1 1 +2 l2 2 2 l2 2 +2 l3 3 2 l2 2 +3 l4 4 2 l3 3 +4 l5 \N 3 l4 4 +4 l6 \N 3 l4 4 +5 l7 \N 0 \N +8 l8 \N 0 \N +9 l9 \N 0 \N diff --git a/tests/queries/0_stateless/03254_pr_join_on_dups.sql b/tests/queries/0_stateless/03254_pr_join_on_dups.sql new file mode 100644 index 00000000000..aca4fc6b6c3 --- /dev/null +++ b/tests/queries/0_stateless/03254_pr_join_on_dups.sql @@ -0,0 +1,73 @@ +drop table if exists X sync; +drop table if exists Y sync; + +set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken and leads to unexpected results, https://github.com/ClickHouse/ClickHouse/issues/65690 + +create table X (id Int32, x_a String, x_b Nullable(Int32)) engine ReplicatedMergeTree('/clickhouse/{database}/X', '1') order by tuple(); +create table Y (id Int32, y_a String, y_b Nullable(String)) engine ReplicatedMergeTree('/clickhouse/{database}/Y', '1') order by tuple(); + +insert into X (id, x_a, x_b) values (1, 'l1', 1), (2, 'l2', 2), (2, 'l3', 3), (3, 'l4', 4); +insert into X (id, x_a) values (4, 'l5'), (4, 'l6'), (5, 'l7'), (8, 'l8'), (9, 'l9'); +insert into Y (id, y_a) values (1, 'r1'), (1, 'r2'), (2, 'r3'), (3, 'r4'), (3, 'r5'); +insert into Y (id, y_a, y_b) values (4, 'r6', 'nr6'), (6, 'r7', 'nr7'), (7, 'r8', 'nr8'), (9, 'r9', 'nr9'); + +set enable_analyzer = 1, enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost'; + +select 'inner'; +select X.*, Y.* from X inner join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; +select 'inner subs'; +select s.*, j.* from (select * from X) as s inner join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b; +select 'inner expr'; +select X.*, Y.* from X inner join Y on (X.id + 1) = (Y.id + 1) order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; + +select 'left'; +select X.*, Y.* from X left join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; +select 'left subs'; +select s.*, j.* from (select * from X) as s left join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b; +select 'left expr'; +select X.*, Y.* from X left join Y on (X.id + 1) = (Y.id + 1) order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; + +select 'right'; +select X.*, Y.* from X right join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; +select 'right subs'; +select s.*, j.* from (select * from X) as s right join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b; + +select 'full'; +select X.*, Y.* from X full join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b; +select 'full subs'; +select s.*, j.* from (select * from X) as s full join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b; + +select 'self inner'; +select X.*, s.* from X inner join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self inner nullable'; +select X.*, s.* from X inner join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self inner nullable vs not nullable'; +select X.*, s.* from X inner join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self inner nullable vs not nullable 2'; +select Y.*, s.* from Y inner join (select * from Y) as s on concat('n', Y.y_a) = s.y_b order by Y.id, Y.y_a, Y.y_b, s.id, s.y_a, s.y_b; + +select 'self left'; +select X.*, s.* from X left join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self left nullable'; +select X.*, s.* from X left join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self left nullable vs not nullable'; +select X.*, s.* from X left join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self left nullable vs not nullable 2'; +select Y.*, s.* from Y left join (select * from Y) as s on concat('n', Y.y_a) = s.y_b order by Y.id, Y.y_a, Y.y_b, s.id, s.y_a, s.y_b; + +select 'self right'; +select X.*, s.* from X right join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self right nullable'; +select X.*, s.* from X right join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self right nullable vs not nullable'; +select X.*, s.* from X right join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; + +select 'self full'; +select X.*, s.* from X full join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self full nullable'; +select X.*, s.* from X full join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; +select 'self full nullable vs not nullable'; +select X.*, s.* from X full join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b; + +drop table X sync; +drop table Y sync; diff --git a/tests/queries/0_stateless/03261_pr_semi_anti_join.reference b/tests/queries/0_stateless/03261_pr_semi_anti_join.reference new file mode 100644 index 00000000000..782147f1f6f --- /dev/null +++ b/tests/queries/0_stateless/03261_pr_semi_anti_join.reference @@ -0,0 +1,16 @@ +semi left +2 a3 2 b1 +2 a6 2 b1 +4 a5 4 b3 +semi right +2 a3 2 b1 +2 a3 2 b2 +4 a5 4 b3 +4 a5 4 b4 +4 a5 4 b5 +anti left +0 a1 0 +1 a2 1 +3 a4 3 +anti right +0 5 b6 diff --git a/tests/queries/0_stateless/03261_pr_semi_anti_join.sql b/tests/queries/0_stateless/03261_pr_semi_anti_join.sql new file mode 100644 index 00000000000..2d671756d6e --- /dev/null +++ b/tests/queries/0_stateless/03261_pr_semi_anti_join.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS t1 SYNC; +DROP TABLE IF EXISTS t2 SYNC; + +CREATE TABLE t1 (x UInt32, s String) engine ReplicatedMergeTree('/clickhouse/{database}/t1', '1') order by tuple(); +CREATE TABLE t2 (x UInt32, s String) engine ReplicatedMergeTree('/clickhouse/{database}/t2', '1') order by tuple(); + +INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5'), (2, 'a6'); +INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6'); + +SET join_use_nulls = 0; +set enable_analyzer = 1, enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost'; + +SELECT 'semi left'; +SELECT t1.*, t2.* FROM t1 SEMI LEFT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s; + +SELECT 'semi right'; +SELECT t1.*, t2.* FROM t1 SEMI RIGHT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s; + +SELECT 'anti left'; +SELECT t1.*, t2.* FROM t1 ANTI LEFT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s; + +SELECT 'anti right'; +SELECT t1.*, t2.* FROM t1 ANTI RIGHT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s; + +DROP TABLE t1 SYNC; +DROP TABLE t2 SYNC;